Prevent objective context from being executed multiple times

Consider the following sequence:
1. Flow objective issued
2. Cache timeout since the objective is not processed in time.
   onError()() will be triggered with ObjectiveError.INSTALLATIONTIMEOUT
3. While 2 is still being processed, the objective actually completed.
   onSuccess() will be triggered and therefore the same objective will be dequeued and executed again

The unit test was also wrong.
One objective context should only be executed once no matter the objective succeeded or failed.

In addition, we also fix unit test.

The execution time was originally set to a value that is too close to
the timeout. Therefore, there is chance that it will actually make it
in time. The purpose of this patch is to further delayed the execution
time and therefore get a better chance to create a true timeout.

Change-Id: I2501903bebee6fd9eefb03a71042c4a06ba4c42f
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index f625e9c..4f8404b 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -53,6 +53,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -154,20 +155,7 @@
         // Inject ObjectiveContext such that we can get notified when it is completed
         Objective.Builder objBuilder = originalObjective.copy();
         Optional<ObjectiveContext> originalContext = originalObjective.context();
-        ObjectiveContext context = new ObjectiveContext() {
-            @Override
-            public void onSuccess(Objective objective) {
-                log.trace("Flow objective onSuccess {}", objective);
-                dequeue(deviceId, objective, null);
-                originalContext.ifPresent(c -> c.onSuccess(objective));
-            }
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                log.warn("Flow objective onError {}. Reason = {}", objective, error);
-                dequeue(deviceId, objective, error);
-                originalContext.ifPresent(c -> c.onError(objective, error));
-            }
-        };
+        ObjectiveContext context = new InOrderObjectiveContext(deviceId, originalContext.orElse(null));
 
         // Preserve Objective.Operation
         Objective objective;
@@ -431,4 +419,42 @@
             }
         }
     }
+
+    final class InOrderObjectiveContext implements ObjectiveContext {
+        private final DeviceId deviceId;
+        private final ObjectiveContext originalContext;
+        // Prevent context from being executed multiple times.
+        // E.g. when the context actually succeed after the cache timeout
+        private final AtomicBoolean done;
+
+        InOrderObjectiveContext(DeviceId deviceId, ObjectiveContext originalContext) {
+            this.deviceId = deviceId;
+            this.originalContext = originalContext;
+            this.done = new AtomicBoolean(false);
+        }
+
+        @Override
+        public void onSuccess(Objective objective) {
+            log.trace("Flow objective onSuccess {}", objective);
+
+            if (!done.getAndSet(true)) {
+                dequeue(deviceId, objective, null);
+                if (originalContext != null) {
+                    originalContext.onSuccess(objective);
+                }
+            }
+
+        }
+        @Override
+        public void onError(Objective objective, ObjectiveError error) {
+            log.warn("Flow objective onError {}. Reason = {}", objective, error);
+
+            if (!done.getAndSet(true)) {
+                dequeue(deviceId, objective, error);
+                if (originalContext != null) {
+                    originalContext.onError(objective, error);
+                }
+            }
+        }
+    }
 }
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index 459ea08..32c9f95 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -258,12 +258,12 @@
         // Reduce timeout so the unit test doesn't have to wait many seconds
         internalSetup(TIMEOUT_THRESH);
 
-        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(2);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(1);
         expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(2);
         replay(mgr.flowObjectiveStore);
 
         // Force this objective to time out
-        offset = mgr.objTimeoutMs * 2;
+        offset = mgr.objTimeoutMs * 3;
 
         expectFwdObjsTimeout.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));