Address StackOverflow by offloading cache listener to a separate thread

In addition,
- Address Thomas' comment in gerrit #20442
- Clean up redundant code

Change-Id: Iafb20895d3b338431c7faf9b810b6a6f6b055d92
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 16b0e69..3f4de99 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -18,7 +18,8 @@
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalListeners;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimaps;
@@ -37,6 +38,7 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.net.flowobjective.ObjectiveQueueKey;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -45,11 +47,14 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 
@@ -64,6 +69,9 @@
     private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
     private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
     private ScheduledExecutorService cacheCleaner;
+    private ExecutorService filtCacheEventExecutor;
+    private ExecutorService fwdCacheEventExecutor;
+    private ExecutorService nextCacheEventExecutor;
 
     private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
             Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
@@ -78,55 +86,36 @@
     protected void activate() {
         super.activate();
 
-        // TODO Clean up duplicated code
+        filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
+        fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
+        nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
+
+        RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
+            Objective obj = notification.getValue();
+            switch (notification.getCause()) {
+                case EXPIRED:
+                case COLLECTED:
+                case SIZE:
+                    obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                    break;
+                case EXPLICIT: // No action when the objective completes correctly
+                case REPLACED: // No action when a pending forward or next objective gets executed
+                default:
+                    break;
+            }
+        };
         filtObjQueueHead = CacheBuilder.newBuilder()
                 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<FilteringObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
+                .build();
         fwdObjQueueHead = CacheBuilder.newBuilder()
                 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<ForwardingObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
+                .build();
         nextObjQueueHead = CacheBuilder.newBuilder()
                 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
+                .build();
 
         cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
         cacheCleaner.scheduleAtFixedRate(() -> {
@@ -146,6 +135,10 @@
         cacheCleaner.shutdown();
         clearQueue();
 
+        filtCacheEventExecutor.shutdown();
+        fwdCacheEventExecutor.shutdown();
+        nextCacheEventExecutor.shutdown();
+
         super.deactivate();
     }
 
@@ -318,7 +311,7 @@
 
         if (obj instanceof FilteringObjective) {
             FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 filtObjQueueHead.invalidate(k);
             }
             filtObjQueue.remove(k, obj);
@@ -326,7 +319,7 @@
         } else if (obj instanceof ForwardingObjective) {
             ForwardingObjQueueKey k =
                     new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 fwdObjQueueHead.invalidate(k);
             }
             fwdObjQueue.remove(k, obj);
@@ -349,7 +342,7 @@
                 }
             }
             NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 nextObjQueueHead.invalidate(k);
             }
             nextObjQueue.remove(k, obj);