Address StackOverflow by offloading cache listener to a separate thread

In addition,
- Address Thomas' comment in gerrit #20442
- Clean up redundant code and speed up unit tests

Change-Id: Iafb20895d3b338431c7faf9b810b6a6f6b055d92
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
index 8ec1bce3..acf09b0 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
@@ -24,7 +24,7 @@
 /**
  * Filtering objective queue key.
  */
-public class FilteringObjQueueKey {
+public class FilteringObjQueueKey implements ObjectiveQueueKey {
     private DeviceId deviceId;
     private int priority;
     private Criterion key;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
index d963d8a..fa0c1a4 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
@@ -24,7 +24,7 @@
 /**
  * Forwarding objective queue key.
  */
-public class ForwardingObjQueueKey {
+public class ForwardingObjQueueKey implements ObjectiveQueueKey {
     private DeviceId deviceId;
     private int priority;
     private TrafficSelector selector;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
index a2d9c64..b49238f 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
@@ -23,7 +23,7 @@
 /**
  * Next objective queue key.
  */
-public class NextObjQueueKey {
+public class NextObjQueueKey implements ObjectiveQueueKey {
     private DeviceId deviceId;
     private int id;
 
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java
new file mode 100644
index 0000000..30bcea0
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+/**
+ * Objective queue key.
+ */
+public interface ObjectiveQueueKey {
+
+}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 4f70745..a050e07 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -18,7 +18,8 @@
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalListeners;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimaps;
@@ -40,16 +41,20 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.net.flowobjective.ObjectiveQueueKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 
@@ -59,12 +64,15 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     // TODO Make queue timeout configurable
-    static final int OBJ_TIMEOUT_MS = 15000;
+    static int objTimeoutMs = 15000;
 
     private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
     private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
     private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
     private ScheduledExecutorService cacheCleaner;
+    private ExecutorService filtCacheEventExecutor;
+    private ExecutorService fwdCacheEventExecutor;
+    private ExecutorService nextCacheEventExecutor;
 
     private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
             Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
@@ -79,62 +87,43 @@
     protected void activate() {
         super.activate();
 
-        // TODO Clean up duplicated code
+        filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
+        fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
+        nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
+
+        RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
+            Objective obj = notification.getValue();
+            switch (notification.getCause()) {
+                case EXPIRED:
+                case COLLECTED:
+                case SIZE:
+                    obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                    break;
+                case EXPLICIT: // No action when the objective completes correctly
+                case REPLACED: // No action when a pending forward or next objective gets executed
+                default:
+                    break;
+            }
+        };
         filtObjQueueHead = CacheBuilder.newBuilder()
-                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<FilteringObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
+                .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
+                .build();
         fwdObjQueueHead = CacheBuilder.newBuilder()
-                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<ForwardingObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
+                .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
+                .build();
         nextObjQueueHead = CacheBuilder.newBuilder()
-                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
+                .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
+                .build();
 
         cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
         cacheCleaner.scheduleAtFixedRate(() -> {
             filtObjQueueHead.cleanUp();
             fwdObjQueueHead.cleanUp();
             nextObjQueueHead.cleanUp();
-        }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        }, 0, objTimeoutMs, TimeUnit.MILLISECONDS);
 
         // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
         // execute()
@@ -147,6 +136,10 @@
         cacheCleaner.shutdown();
         clearQueue();
 
+        filtCacheEventExecutor.shutdown();
+        fwdCacheEventExecutor.shutdown();
+        nextCacheEventExecutor.shutdown();
+
         super.deactivate();
     }
 
@@ -319,7 +312,7 @@
 
         if (obj instanceof FilteringObjective) {
             FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 filtObjQueueHead.invalidate(k);
             }
             filtObjQueue.remove(k, obj);
@@ -327,7 +320,7 @@
         } else if (obj instanceof ForwardingObjective) {
             ForwardingObjQueueKey k =
                     new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 fwdObjQueueHead.invalidate(k);
             }
             fwdObjQueue.remove(k, obj);
@@ -350,7 +343,7 @@
                 }
             }
             NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 nextObjQueueHead.invalidate(k);
             }
             nextObjQueue.remove(k, obj);
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index 7ce1c16..94096b5 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -66,7 +66,9 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class InOrderFlowObjectiveManagerTest {
     private InOrderFlowObjectiveManager mgr;
@@ -96,6 +98,7 @@
     // Delay flow objectives OFFSET + rand(0, BOUND) millis
     private static final int DEFAULT_OFFSET = 10; // ms
     private static final int DEFAULT_BOUND = 40; // ms
+    private static final int TIMEOUT_THRESH = 100; // ms
     private static int offset = DEFAULT_OFFSET;
     private static int bound = DEFAULT_BOUND;
 
@@ -131,16 +134,6 @@
     private List<ForwardingObjective> expectFwdObjs = Lists.newCopyOnWriteArrayList(
             Lists.newArrayList(FWD1, FWD2, FWD3, FWD4, FWD5, FWD6));
 
-    private static boolean timeout = false;
-    private static final ForwardingObjective FWD11 = buildFwdObjective(S1, NID2).add(new ObjectiveContext() {
-        @Override
-        public void onError(Objective objective, ObjectiveError error) {
-            timeout = error.equals(ObjectiveError.INSTALLATIONTIMEOUT);
-        }
-    });
-    private List<ForwardingObjective> expectFwdObjsTimeout = Lists.newCopyOnWriteArrayList(
-            Lists.newArrayList(FWD11, FWD1, FWD2));
-
     private List<Objective> actualObjs = Lists.newCopyOnWriteArrayList();
 
     private Pipeliner pipeliner = new PipelinerAdapter() {
@@ -201,7 +194,6 @@
         mgr.activate();
 
         reset(mgr.flowObjectiveStore);
-        timeout = false;
         offset = DEFAULT_OFFSET;
         bound = DEFAULT_BOUND;
         actualObjs.clear();
@@ -244,12 +236,28 @@
 
     @Test
     public void forwardTimeout() {
+        final AtomicInteger counter = new AtomicInteger(0);
+        ForwardingObjective fwdTimeout = buildFwdObjective(S1, NID2).add(new ObjectiveContext() {
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                if (Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
+                    counter.incrementAndGet();
+                }
+            }
+        });
+        List<ForwardingObjective> expectFwdObjsTimeout = Lists.newCopyOnWriteArrayList(
+                Lists.newArrayList(fwdTimeout, FWD1, FWD2));
+
+        // Reduce timeout so the unit test doesn't have to wait many seconds
+        InOrderFlowObjectiveManager.objTimeoutMs = TIMEOUT_THRESH;
+        setUp();
+
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(1);
         expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(2);
         replay(mgr.flowObjectiveStore);
 
         // Force this objective to time out
-        offset = InOrderFlowObjectiveManager.OBJ_TIMEOUT_MS + 500;
+        offset = InOrderFlowObjectiveManager.objTimeoutMs * 2;
 
         expectFwdObjsTimeout.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
 
@@ -257,8 +265,8 @@
         int expectedTime = (bound + offset) * 3;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjsTimeout.size(), actualObjs.size()));
 
-        assertTrue(timeout);
-        assertTrue(actualObjs.indexOf(FWD11) < actualObjs.indexOf(FWD1));
+        assertTrue(counter.get() != 0);
+        assertTrue(actualObjs.indexOf(fwdTimeout) < actualObjs.indexOf(FWD1));
 
         verify(mgr.flowObjectiveStore);
     }