Avoid head of queue blocking in InOrderFlowObjecitveManager

Change-Id: Ifec6f861634a0a7c335d0591861391b03c36f854
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 823891f..f32bb6c 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -16,6 +16,9 @@
 
 package org.onosproject.net.flowobjective.impl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimaps;
@@ -43,13 +46,25 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 
 @Component(immediate = true, enabled = true)
 @Service
 public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    // TODO Making these cache and timeout the entries
+    // TODO Make queue timeout configurable
+    static final int OBJ_TIMEOUT_MS = 5000;
+
+    private Cache<FiltObjQueueKey, Objective> filtObjQueueHead;
+    private Cache<FwdObjQueueKey, Objective> fwdObjQueueHead;
+    private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
+    private ScheduledExecutorService cacheCleaner;
+
     private ListMultimap<FiltObjQueueKey, Objective> filtObjQueue =
             Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
     private ListMultimap<FwdObjQueueKey, Objective> fwdObjQueue =
@@ -62,14 +77,77 @@
     @Activate
     protected void activate() {
         super.activate();
+
+        // TODO Clean up duplicated code
+        filtObjQueueHead = CacheBuilder.newBuilder()
+                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .removalListener((RemovalNotification<FiltObjQueueKey, Objective> notification) -> {
+                    Objective obj = notification.getValue();
+                    switch (notification.getCause()) {
+                        case EXPIRED:
+                        case COLLECTED:
+                        case SIZE:
+                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                            break;
+                        case EXPLICIT: // No action when the objective completes correctly
+                        case REPLACED: // No action when a pending forward or next objective gets executed
+                        default:
+                            break;
+                    }
+                }).build();
+        fwdObjQueueHead = CacheBuilder.newBuilder()
+                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .removalListener((RemovalNotification<FwdObjQueueKey, Objective> notification) -> {
+                    Objective obj = notification.getValue();
+                    switch (notification.getCause()) {
+                        case EXPIRED:
+                        case COLLECTED:
+                        case SIZE:
+                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                            break;
+                        case EXPLICIT: // No action when the objective completes correctly
+                        case REPLACED: // No action when a pending forward or next objective gets executed
+                        default:
+                            break;
+                    }
+                }).build();
+        nextObjQueueHead = CacheBuilder.newBuilder()
+                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
+                    Objective obj = notification.getValue();
+                    switch (notification.getCause()) {
+                        case EXPIRED:
+                        case COLLECTED:
+                        case SIZE:
+                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                            break;
+                        case EXPLICIT: // No action when the objective completes correctly
+                        case REPLACED: // No action when a pending forward or next objective gets executed
+                        default:
+                            break;
+                    }
+                }).build();
+
+        cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
+        cacheCleaner.scheduleAtFixedRate(() -> {
+            filtObjQueueHead.cleanUp();
+            fwdObjQueueHead.cleanUp();
+            nextObjQueueHead.cleanUp();
+        }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
         // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
-        // process()
+        // execute()
         flowObjectiveStore.unsetDelegate(super.delegate);
         flowObjectiveStore.setDelegate(delegate);
     }
 
     @Deactivate
     protected void deactivate() {
+        cacheCleaner.shutdown();
+        filtObjQueueHead.invalidateAll();
+        fwdObjQueueHead.invalidateAll();
+        nextObjQueueHead.invalidateAll();
+
         super.deactivate();
     }
 
@@ -93,7 +171,7 @@
             }
             @Override
             public void onError(Objective objective, ObjectiveError error) {
-                log.trace("Flow objective onError {}", objective);
+                log.warn("Flow objective onError {}", objective);
                 dequeue(deviceId, objective);
                 originalContext.ifPresent(c -> c.onError(objective, error));
             }
@@ -195,14 +273,17 @@
 
         if (obj instanceof FilteringObjective) {
             FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+            filtObjQueueHead.invalidate(k);
             filtObjQueue.remove(k, obj);
             remaining = filtObjQueue.get(k);
         } else if (obj instanceof ForwardingObjective) {
             FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+            fwdObjQueueHead.invalidate(k);
             fwdObjQueue.remove(k, obj);
             remaining = fwdObjQueue.get(k);
         } else if (obj instanceof NextObjective) {
             NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+            nextObjQueueHead.invalidate(k);
             nextObjQueue.remove(k, obj);
             remaining = nextObjQueue.get(k);
         } else {
@@ -228,11 +309,18 @@
         LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
         Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
 
+        int priority = obj.priority();
         if (obj instanceof FilteringObjective) {
+            FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+            filtObjQueueHead.put(k, obj);
             super.filter(deviceId, (FilteringObjective) obj);
         } else if (obj instanceof ForwardingObjective) {
+            FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+            fwdObjQueueHead.put(k, obj);
             super.forward(deviceId, (ForwardingObjective) obj);
         } else if (obj instanceof NextObjective) {
+            NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+            nextObjQueueHead.put(k, obj);
             super.next(deviceId, (NextObjective) obj);
         } else {
             log.error("Unknown flow objective instance: {}", obj.getClass().getName());
@@ -256,7 +344,7 @@
                 } else {
                     log.debug("Processing {} pending forwarding objectives for nextId {}",
                             pending.size(), event.subject());
-                    // resubmitted back to the execution queue
+                    // execute pending forwards one by one
                     pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
                 }
 
@@ -274,7 +362,7 @@
                 } else {
                     log.debug("Processing {} pending next objectives for nextId {}",
                             pendNexts.size(), event.subject());
-                    // resubmitted back to the execution queue
+                    // execute pending nexts one by one
                     pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
                 }
             }
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index bb89ced..7ce1c16 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -26,6 +26,7 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.MplsLabel;
 import org.onlab.packet.VlanId;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.net.DeviceId;
@@ -33,6 +34,8 @@
 import org.onosproject.net.behaviour.NextGroup;
 import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.behaviour.PipelinerAdapter;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficSelector;
@@ -46,6 +49,7 @@
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
 
@@ -53,6 +57,7 @@
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
@@ -89,8 +94,10 @@
     private static final NextGroup NGRP2 = () -> new byte[] {0x02, 0x03};
 
     // Delay flow objectives OFFSET + rand(0, BOUND) millis
-    private static final int OFFSET = 10; // ms
-    private static final int BOUND = 40; // ms
+    private static final int DEFAULT_OFFSET = 10; // ms
+    private static final int DEFAULT_BOUND = 40; // ms
+    private static int offset = DEFAULT_OFFSET;
+    private static int bound = DEFAULT_BOUND;
 
     private static final FilteringObjective FILT1 = buildFilteringObjective(P2, V3, M3, 1).add();
     private static final FilteringObjective FILT2 = buildFilteringObjective(P2, V2, M2, 2).add();
@@ -124,6 +131,16 @@
     private List<ForwardingObjective> expectFwdObjs = Lists.newCopyOnWriteArrayList(
             Lists.newArrayList(FWD1, FWD2, FWD3, FWD4, FWD5, FWD6));
 
+    private static boolean timeout = false;
+    private static final ForwardingObjective FWD11 = buildFwdObjective(S1, NID2).add(new ObjectiveContext() {
+        @Override
+        public void onError(Objective objective, ObjectiveError error) {
+            timeout = error.equals(ObjectiveError.INSTALLATIONTIMEOUT);
+        }
+    });
+    private List<ForwardingObjective> expectFwdObjsTimeout = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(FWD11, FWD1, FWD2));
+
     private List<Objective> actualObjs = Lists.newCopyOnWriteArrayList();
 
     private Pipeliner pipeliner = new PipelinerAdapter() {
@@ -163,7 +180,7 @@
          */
         private void recordObjective(Objective obj) {
             try {
-                Thread.sleep(new Random().nextInt(BOUND) + OFFSET);
+                Thread.sleep(new Random().nextInt(bound) + offset);
                 actualObjs.add(obj);
                 obj.context().ifPresent(c -> c.onSuccess(obj));
             } catch (Exception e) {
@@ -177,17 +194,25 @@
         mgr = new InOrderFlowObjectiveManager();
         mgr.pipeliners.put(DEV1, pipeliner);
         mgr.executorService = newFixedThreadPool(4, groupedThreads("foo", "bar"));
+        mgr.cfgService = createMock(ComponentConfigService.class);
+        mgr.deviceService = createMock(DeviceService.class);
+        mgr.driverService = createMock(DriverService.class);
         mgr.flowObjectiveStore = createMock(FlowObjectiveStore.class);
+        mgr.activate();
 
+        reset(mgr.flowObjectiveStore);
+        timeout = false;
+        offset = DEFAULT_OFFSET;
+        bound = DEFAULT_BOUND;
         actualObjs.clear();
     }
 
     @Test
-    public void filter() throws Exception {
+    public void filter() {
         expectFiltObjs.forEach(filtObj -> mgr.filter(DEV1, filtObj));
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 7;
+        int expectedTime = (bound + offset) * 7;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFiltObjs.size(), actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(FILT1) < actualObjs.indexOf(FILT2));
@@ -198,7 +223,7 @@
     }
 
     @Test
-    public void forward() throws Exception {
+    public void forward() {
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
         expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
         replay(mgr.flowObjectiveStore);
@@ -206,7 +231,7 @@
         expectFwdObjs.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 6;
+        int expectedTime = (bound + offset) * 6;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size(), actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(FWD1) < actualObjs.indexOf(FWD3));
@@ -218,7 +243,28 @@
     }
 
     @Test
-    public void forwardPending() throws Exception {
+    public void forwardTimeout() {
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(1);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(2);
+        replay(mgr.flowObjectiveStore);
+
+        // Force this objective to time out
+        offset = InOrderFlowObjectiveManager.OBJ_TIMEOUT_MS + 500;
+
+        expectFwdObjsTimeout.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
+
+        // Wait for the pipeline operation to complete
+        int expectedTime = (bound + offset) * 3;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjsTimeout.size(), actualObjs.size()));
+
+        assertTrue(timeout);
+        assertTrue(actualObjs.indexOf(FWD11) < actualObjs.indexOf(FWD1));
+
+        verify(mgr.flowObjectiveStore);
+    }
+
+    @Test
+    public void forwardPending() {
         // Note: current logic will double check if the next obj need to be queued
         //       it does not check when resubmitting pending next back to the queue
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(2);
@@ -234,7 +280,7 @@
         mgr.next(DEV1, NEXT2);
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 8;
+        int expectedTime = (bound + offset) * 8;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size() + 2, actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(FWD1));
@@ -248,7 +294,7 @@
     }
 
     @Test
-    public void next() throws Exception {
+    public void next() {
         // Note: ADD operation won't query this
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
         expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
@@ -257,7 +303,7 @@
         expectNextObjs.forEach(nextObj -> mgr.next(DEV1, nextObj));
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 8;
+        int expectedTime = (bound + offset) * 8;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT3));
@@ -276,7 +322,7 @@
     //       pendingForwards, pendingNexts and ordering queue caches.
     @Test
     @Ignore("Not supported")
-    public void nextPending() throws Exception {
+    public void nextPending() {
         // Note: current logic will double check if the next obj need to be queued
         //       it does not check when resubmitting pending next back to the queue
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(6);
@@ -286,7 +332,7 @@
         expectNextObjsPending.forEach(nextObj -> mgr.next(DEV1, nextObj));
 
         // Wait for the pipeline operation to complete
-        int expectedTime = (BOUND + OFFSET) * 8;
+        int expectedTime = (bound + offset) * 8;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
 
         assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT5));