Avoid head of queue blocking in InOrderFlowObjecitveManager

Change-Id: Ifec6f861634a0a7c335d0591861391b03c36f854
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 823891f..f32bb6c 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -16,6 +16,9 @@
 
 package org.onosproject.net.flowobjective.impl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimaps;
@@ -43,13 +46,25 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 
 @Component(immediate = true, enabled = true)
 @Service
 public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    // TODO Making these cache and timeout the entries
+    // TODO Make queue timeout configurable
+    static final int OBJ_TIMEOUT_MS = 5000;
+
+    private Cache<FiltObjQueueKey, Objective> filtObjQueueHead;
+    private Cache<FwdObjQueueKey, Objective> fwdObjQueueHead;
+    private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
+    private ScheduledExecutorService cacheCleaner;
+
     private ListMultimap<FiltObjQueueKey, Objective> filtObjQueue =
             Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
     private ListMultimap<FwdObjQueueKey, Objective> fwdObjQueue =
@@ -62,14 +77,77 @@
     @Activate
     protected void activate() {
         super.activate();
+
+        // TODO Clean up duplicated code
+        filtObjQueueHead = CacheBuilder.newBuilder()
+                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .removalListener((RemovalNotification<FiltObjQueueKey, Objective> notification) -> {
+                    Objective obj = notification.getValue();
+                    switch (notification.getCause()) {
+                        case EXPIRED:
+                        case COLLECTED:
+                        case SIZE:
+                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                            break;
+                        case EXPLICIT: // No action when the objective completes correctly
+                        case REPLACED: // No action when a pending forward or next objective gets executed
+                        default:
+                            break;
+                    }
+                }).build();
+        fwdObjQueueHead = CacheBuilder.newBuilder()
+                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .removalListener((RemovalNotification<FwdObjQueueKey, Objective> notification) -> {
+                    Objective obj = notification.getValue();
+                    switch (notification.getCause()) {
+                        case EXPIRED:
+                        case COLLECTED:
+                        case SIZE:
+                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                            break;
+                        case EXPLICIT: // No action when the objective completes correctly
+                        case REPLACED: // No action when a pending forward or next objective gets executed
+                        default:
+                            break;
+                    }
+                }).build();
+        nextObjQueueHead = CacheBuilder.newBuilder()
+                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
+                    Objective obj = notification.getValue();
+                    switch (notification.getCause()) {
+                        case EXPIRED:
+                        case COLLECTED:
+                        case SIZE:
+                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                            break;
+                        case EXPLICIT: // No action when the objective completes correctly
+                        case REPLACED: // No action when a pending forward or next objective gets executed
+                        default:
+                            break;
+                    }
+                }).build();
+
+        cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
+        cacheCleaner.scheduleAtFixedRate(() -> {
+            filtObjQueueHead.cleanUp();
+            fwdObjQueueHead.cleanUp();
+            nextObjQueueHead.cleanUp();
+        }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
         // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
-        // process()
+        // execute()
         flowObjectiveStore.unsetDelegate(super.delegate);
         flowObjectiveStore.setDelegate(delegate);
     }
 
     @Deactivate
     protected void deactivate() {
+        cacheCleaner.shutdown();
+        filtObjQueueHead.invalidateAll();
+        fwdObjQueueHead.invalidateAll();
+        nextObjQueueHead.invalidateAll();
+
         super.deactivate();
     }
 
@@ -93,7 +171,7 @@
             }
             @Override
             public void onError(Objective objective, ObjectiveError error) {
-                log.trace("Flow objective onError {}", objective);
+                log.warn("Flow objective onError {}", objective);
                 dequeue(deviceId, objective);
                 originalContext.ifPresent(c -> c.onError(objective, error));
             }
@@ -195,14 +273,17 @@
 
         if (obj instanceof FilteringObjective) {
             FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+            filtObjQueueHead.invalidate(k);
             filtObjQueue.remove(k, obj);
             remaining = filtObjQueue.get(k);
         } else if (obj instanceof ForwardingObjective) {
             FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+            fwdObjQueueHead.invalidate(k);
             fwdObjQueue.remove(k, obj);
             remaining = fwdObjQueue.get(k);
         } else if (obj instanceof NextObjective) {
             NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+            nextObjQueueHead.invalidate(k);
             nextObjQueue.remove(k, obj);
             remaining = nextObjQueue.get(k);
         } else {
@@ -228,11 +309,18 @@
         LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
         Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
 
+        int priority = obj.priority();
         if (obj instanceof FilteringObjective) {
+            FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+            filtObjQueueHead.put(k, obj);
             super.filter(deviceId, (FilteringObjective) obj);
         } else if (obj instanceof ForwardingObjective) {
+            FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+            fwdObjQueueHead.put(k, obj);
             super.forward(deviceId, (ForwardingObjective) obj);
         } else if (obj instanceof NextObjective) {
+            NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+            nextObjQueueHead.put(k, obj);
             super.next(deviceId, (NextObjective) obj);
         } else {
             log.error("Unknown flow objective instance: {}", obj.getClass().getName());
@@ -256,7 +344,7 @@
                 } else {
                     log.debug("Processing {} pending forwarding objectives for nextId {}",
                             pending.size(), event.subject());
-                    // resubmitted back to the execution queue
+                    // execute pending forwards one by one
                     pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
                 }
 
@@ -274,7 +362,7 @@
                 } else {
                     log.debug("Processing {} pending next objectives for nextId {}",
                             pendNexts.size(), event.subject());
-                    // resubmitted back to the execution queue
+                    // execute pending nexts one by one
                     pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
                 }
             }