Address StackOverflow by offloading cache listener to a separate thread

In addition,
- Address Thomas' comment in gerrit #20442
- Clean up redundant code and speed up unit tests

Change-Id: Iafb20895d3b338431c7faf9b810b6a6f6b055d92
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 4f70745..a050e07 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -18,7 +18,8 @@
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalListeners;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimaps;
@@ -40,16 +41,20 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.net.flowobjective.ObjectiveQueueKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 
@@ -59,12 +64,15 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     // TODO Make queue timeout configurable
-    static final int OBJ_TIMEOUT_MS = 15000;
+    static int objTimeoutMs = 15000;
 
     private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
     private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
     private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
     private ScheduledExecutorService cacheCleaner;
+    private ExecutorService filtCacheEventExecutor;
+    private ExecutorService fwdCacheEventExecutor;
+    private ExecutorService nextCacheEventExecutor;
 
     private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
             Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
@@ -79,62 +87,43 @@
     protected void activate() {
         super.activate();
 
-        // TODO Clean up duplicated code
+        filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
+        fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
+        nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
+
+        RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
+            Objective obj = notification.getValue();
+            switch (notification.getCause()) {
+                case EXPIRED:
+                case COLLECTED:
+                case SIZE:
+                    obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                    break;
+                case EXPLICIT: // No action when the objective completes correctly
+                case REPLACED: // No action when a pending forward or next objective gets executed
+                default:
+                    break;
+            }
+        };
         filtObjQueueHead = CacheBuilder.newBuilder()
-                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<FilteringObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
+                .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
+                .build();
         fwdObjQueueHead = CacheBuilder.newBuilder()
-                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<ForwardingObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
+                .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
+                .build();
         nextObjQueueHead = CacheBuilder.newBuilder()
-                .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
+                .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
+                .build();
 
         cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
         cacheCleaner.scheduleAtFixedRate(() -> {
             filtObjQueueHead.cleanUp();
             fwdObjQueueHead.cleanUp();
             nextObjQueueHead.cleanUp();
-        }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        }, 0, objTimeoutMs, TimeUnit.MILLISECONDS);
 
         // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
         // execute()
@@ -147,6 +136,10 @@
         cacheCleaner.shutdown();
         clearQueue();
 
+        filtCacheEventExecutor.shutdown();
+        fwdCacheEventExecutor.shutdown();
+        nextCacheEventExecutor.shutdown();
+
         super.deactivate();
     }
 
@@ -319,7 +312,7 @@
 
         if (obj instanceof FilteringObjective) {
             FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 filtObjQueueHead.invalidate(k);
             }
             filtObjQueue.remove(k, obj);
@@ -327,7 +320,7 @@
         } else if (obj instanceof ForwardingObjective) {
             ForwardingObjQueueKey k =
                     new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 fwdObjQueueHead.invalidate(k);
             }
             fwdObjQueue.remove(k, obj);
@@ -350,7 +343,7 @@
                 }
             }
             NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 nextObjQueueHead.invalidate(k);
             }
             nextObjQueue.remove(k, obj);
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index 7ce1c16..94096b5 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -66,7 +66,9 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class InOrderFlowObjectiveManagerTest {
     private InOrderFlowObjectiveManager mgr;
@@ -96,6 +98,7 @@
     // Delay flow objectives OFFSET + rand(0, BOUND) millis
     private static final int DEFAULT_OFFSET = 10; // ms
     private static final int DEFAULT_BOUND = 40; // ms
+    private static final int TIMEOUT_THRESH = 100; // ms
     private static int offset = DEFAULT_OFFSET;
     private static int bound = DEFAULT_BOUND;
 
@@ -131,16 +134,6 @@
     private List<ForwardingObjective> expectFwdObjs = Lists.newCopyOnWriteArrayList(
             Lists.newArrayList(FWD1, FWD2, FWD3, FWD4, FWD5, FWD6));
 
-    private static boolean timeout = false;
-    private static final ForwardingObjective FWD11 = buildFwdObjective(S1, NID2).add(new ObjectiveContext() {
-        @Override
-        public void onError(Objective objective, ObjectiveError error) {
-            timeout = error.equals(ObjectiveError.INSTALLATIONTIMEOUT);
-        }
-    });
-    private List<ForwardingObjective> expectFwdObjsTimeout = Lists.newCopyOnWriteArrayList(
-            Lists.newArrayList(FWD11, FWD1, FWD2));
-
     private List<Objective> actualObjs = Lists.newCopyOnWriteArrayList();
 
     private Pipeliner pipeliner = new PipelinerAdapter() {
@@ -201,7 +194,6 @@
         mgr.activate();
 
         reset(mgr.flowObjectiveStore);
-        timeout = false;
         offset = DEFAULT_OFFSET;
         bound = DEFAULT_BOUND;
         actualObjs.clear();
@@ -244,12 +236,28 @@
 
     @Test
     public void forwardTimeout() {
+        final AtomicInteger counter = new AtomicInteger(0);
+        ForwardingObjective fwdTimeout = buildFwdObjective(S1, NID2).add(new ObjectiveContext() {
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                if (Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
+                    counter.incrementAndGet();
+                }
+            }
+        });
+        List<ForwardingObjective> expectFwdObjsTimeout = Lists.newCopyOnWriteArrayList(
+                Lists.newArrayList(fwdTimeout, FWD1, FWD2));
+
+        // Reduce timeout so the unit test doesn't have to wait many seconds
+        InOrderFlowObjectiveManager.objTimeoutMs = TIMEOUT_THRESH;
+        setUp();
+
         expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(1);
         expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(2);
         replay(mgr.flowObjectiveStore);
 
         // Force this objective to time out
-        offset = InOrderFlowObjectiveManager.OBJ_TIMEOUT_MS + 500;
+        offset = InOrderFlowObjectiveManager.objTimeoutMs * 2;
 
         expectFwdObjsTimeout.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
 
@@ -257,8 +265,8 @@
         int expectedTime = (bound + offset) * 3;
         assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjsTimeout.size(), actualObjs.size()));
 
-        assertTrue(timeout);
-        assertTrue(actualObjs.indexOf(FWD11) < actualObjs.indexOf(FWD1));
+        assertTrue(counter.get() != 0);
+        assertTrue(actualObjs.indexOf(fwdTimeout) < actualObjs.indexOf(FWD1));
 
         verify(mgr.flowObjectiveStore);
     }