Increase flow obj timeout and remove pendingForwards and pendingNexts when the next failed

Change-Id: I6a1fde50db9184d87af20e3c605ff5b697e082b6
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 3872702..7ded0f5 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -59,7 +59,7 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     // TODO Make queue timeout configurable
-    static final int OBJ_TIMEOUT_MS = 5000;
+    static final int OBJ_TIMEOUT_MS = 15000;
 
     private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
     private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
@@ -165,13 +165,13 @@
             @Override
             public void onSuccess(Objective objective) {
                 log.trace("Flow objective onSuccess {}", objective);
-                dequeue(deviceId, objective);
+                dequeue(deviceId, objective, null);
                 originalContext.ifPresent(c -> c.onSuccess(objective));
             }
             @Override
             public void onError(Objective objective, ObjectiveError error) {
-                log.warn("Flow objective onError {}. {}", objective, error);
-                dequeue(deviceId, objective);
+                log.warn("Flow objective onError {}. Reason = {}", objective, error);
+                dequeue(deviceId, objective, error);
                 originalContext.ifPresent(c -> c.onError(objective, error));
             }
         };
@@ -308,8 +308,9 @@
      *
      * @param deviceId Device ID
      * @param obj Flow objective
+     * @param error ObjectiveError that triggers this dequeue. Null if this is not triggered by an error.
      */
-    private synchronized void dequeue(DeviceId deviceId, Objective obj) {
+    private synchronized void dequeue(DeviceId deviceId, Objective obj, ObjectiveError error) {
         List<Objective> remaining;
         int priority = obj.priority();
 
@@ -328,6 +329,22 @@
             fwdObjQueue.remove(k, obj);
             remaining = fwdObjQueue.get(k);
         } else if (obj instanceof NextObjective) {
+            if (error != null) {
+                // Remove pendingForwards and pendingNexts if next objective failed
+                Set<PendingFlowObjective> removedForwards = pendingForwards.remove(obj.id());
+                List<PendingFlowObjective> removedNexts = pendingNexts.remove(obj.id());
+
+                if (removedForwards != null) {
+                    removedForwards.stream().map(PendingFlowObjective::flowObjective)
+                            .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
+                                    c.onError(pendingObj, error)));
+                }
+                if (removedNexts != null) {
+                    removedNexts.stream().map(PendingFlowObjective::flowObjective)
+                            .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
+                                    c.onError(pendingObj, error)));
+                }
+            }
             NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
             nextObjQueueHead.invalidate(k);
             nextObjQueue.remove(k, obj);