Avoid head of queue blocking in InOrderFlowObjecitveManager

Change-Id: Ifec6f861634a0a7c335d0591861391b03c36f854
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index bb89ced..7ce1c16 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -26,6 +26,7 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.MplsLabel;
 import org.onlab.packet.VlanId;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.net.DeviceId;
@@ -33,6 +34,8 @@
 import org.onosproject.net.behaviour.NextGroup;
 import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.behaviour.PipelinerAdapter;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficSelector;
@@ -46,6 +49,7 @@
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
 
@@ -53,6 +57,7 @@
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
@@ -89,8 +94,10 @@
     private static final NextGroup NGRP2 = () -> new byte[] {0x02, 0x03};
 
     // Delay flow objectives OFFSET + rand(0, BOUND) millis
-    private static final int OFFSET = 10; // ms
-    private static final int BOUND = 40; // ms
+    private static final int DEFAULT_OFFSET = 10; // ms
+    private static final int DEFAULT_BOUND = 40; // ms
+    private static int offset = DEFAULT_OFFSET;
+    private static int bound = DEFAULT_BOUND;
 
     private static final FilteringObjective FILT1 = buildFilteringObjective(P2, V3, M3, 1).add();
     private static final FilteringObjective FILT2 = buildFilteringObjective(P2, V2, M2, 2).add();
@@ -124,6 +131,16 @@
     private List<ForwardingObjective> expectFwdObjs = Lists.newCopyOnWriteArrayList(
             Lists.newArrayList(FWD1, FWD2, FWD3, FWD4, FWD5, FWD6));
 
+    private static boolean timeout = false;
+    private static final ForwardingObjective FWD11 = buildFwdObjective(S1, NID2).add(new ObjectiveContext() {
+        @Override
+        public void onError(Objective objective, ObjectiveError error) {
+            timeout = error.equals(ObjectiveError.INSTALLATIONTIMEOUT);
+        }
+    });
+    private List<ForwardingObjective> expectFwdObjsTimeout = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(FWD11, FWD1, FWD2));
+
     private List<Objective> actualObjs = Lists.newCopyOnWriteArrayList();
 
     private Pipeliner pipeliner = new PipelinerAdapter() {
@@ -163,7 +180,7 @@
          */
         private void recordObjective(Objective obj) {
             try {
-                Thread.sleep(new Random().nextInt(BOUND) + OFFSET);
+                Thread.sleep(new Random().nextInt(bound) + offset);
                 actualObjs.add(obj);
                 obj.context().ifPresent(c -> c.onSuccess(obj));
             } catch (Exception e) {
@@ -177,17 +194,25 @@
         mgr = new InOrderFlowObjectiveManager();
         mgr.pipeliners.put(DEV1, pipeliner);
         mgr.executorService = newFixedThreadPool(4, groupedThreads("foo", "bar"));
+        mgr.cfgService = createMock(ComponentConfigService.class);
+        mgr.deviceService = createMock(DeviceService.class);
+        mgr.driverService = createMock(DriverService.class);
         mgr.flowObjectiveStore = createMock(FlowObjectiveStore.class);
+        mgr.activate();
 
+        reset(mgr.flowObjectiveStore);
+        timeout = false;
+        offset = DEFAULT_OFFSET;
+        bound = DEFAULT_BOUND;
         actualObjs.clear();
     }
 
     @Test
-    public void filter() throws Exception {
+    public void filter() {
         expectFiltObjs.forEach(filtObj -> mgr.filter(DEV1, filtObj));
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 7;
+        int expectedTime = (bound + offset) * 7;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFiltObjs.size(), actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(FILT1) < actualObjs.indexOf(FILT2));
@@ -198,7 +223,7 @@
     }
 
     @Test
-    public void forward() throws Exception {
+    public void forward() {
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
         expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
         replay(mgr.flowObjectiveStore);
@@ -206,7 +231,7 @@
         expectFwdObjs.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 6;
+        int expectedTime = (bound + offset) * 6;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size(), actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(FWD1) < actualObjs.indexOf(FWD3));
@@ -218,7 +243,28 @@
     }
 
     @Test
-    public void forwardPending() throws Exception {
+    public void forwardTimeout() {
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(1);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(2);
+        replay(mgr.flowObjectiveStore);
+
+        // Force this objective to time out
+        offset = InOrderFlowObjectiveManager.OBJ_TIMEOUT_MS + 500;
+
+        expectFwdObjsTimeout.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
+
+        // Wait for the pipeline operation to complete
+        int expectedTime = (bound + offset) * 3;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjsTimeout.size(), actualObjs.size()));
+
+        assertTrue(timeout);
+        assertTrue(actualObjs.indexOf(FWD11) < actualObjs.indexOf(FWD1));
+
+        verify(mgr.flowObjectiveStore);
+    }
+
+    @Test
+    public void forwardPending() {
         // Note: current logic will double check if the next obj need to be queued
         //       it does not check when resubmitting pending next back to the queue
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(2);
@@ -234,7 +280,7 @@
         mgr.next(DEV1, NEXT2);
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 8;
+        int expectedTime = (bound + offset) * 8;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size() + 2, actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(FWD1));
@@ -248,7 +294,7 @@
     }
 
     @Test
-    public void next() throws Exception {
+    public void next() {
         // Note: ADD operation won't query this
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
         expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
@@ -257,7 +303,7 @@
         expectNextObjs.forEach(nextObj -> mgr.next(DEV1, nextObj));
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 8;
+        int expectedTime = (bound + offset) * 8;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT3));
@@ -276,7 +322,7 @@
     //       pendingForwards, pendingNexts and ordering queue caches.
     @Test
     @Ignore("Not supported")
-    public void nextPending() throws Exception {
+    public void nextPending() {
         // Note: current logic will double check if the next obj need to be queued
         //       it does not check when resubmitting pending next back to the queue
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(6);
@@ -286,7 +332,7 @@
         expectNextObjsPending.forEach(nextObj -> mgr.next(DEV1, nextObj));
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 8;
+        int expectedTime = (bound + offset) * 8;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT5));