Address StackOverflow by offloading cache listener to a separate thread

In addition,
- Address Thomas' comment in gerrit #20442
- Clean up redundant code

Change-Id: Iafb20895d3b338431c7faf9b810b6a6f6b055d92
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
index 8ec1bce3..acf09b0 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
@@ -24,7 +24,7 @@
 /**
  * Filtering objective queue key.
  */
-public class FilteringObjQueueKey {
+public class FilteringObjQueueKey implements ObjectiveQueueKey {
     private DeviceId deviceId;
     private int priority;
     private Criterion key;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
index d963d8a..fa0c1a4 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
@@ -24,7 +24,7 @@
 /**
  * Forwarding objective queue key.
  */
-public class ForwardingObjQueueKey {
+public class ForwardingObjQueueKey implements ObjectiveQueueKey {
     private DeviceId deviceId;
     private int priority;
     private TrafficSelector selector;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
index a2d9c64..b49238f 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
@@ -23,7 +23,7 @@
 /**
  * Next objective queue key.
  */
-public class NextObjQueueKey {
+public class NextObjQueueKey implements ObjectiveQueueKey {
     private DeviceId deviceId;
     private int id;
 
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java
new file mode 100644
index 0000000..30bcea0
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+/**
+ * Objective queue key.
+ */
+public interface ObjectiveQueueKey {
+
+}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 16b0e69..3f4de99 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -18,7 +18,8 @@
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalListeners;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimaps;
@@ -37,6 +38,7 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.net.flowobjective.ObjectiveQueueKey;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -45,11 +47,14 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 
@@ -64,6 +69,9 @@
     private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
     private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
     private ScheduledExecutorService cacheCleaner;
+    private ExecutorService filtCacheEventExecutor;
+    private ExecutorService fwdCacheEventExecutor;
+    private ExecutorService nextCacheEventExecutor;
 
     private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
             Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
@@ -78,55 +86,36 @@
     protected void activate() {
         super.activate();
 
-        // TODO Clean up duplicated code
+        filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
+        fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
+        nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
+
+        RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
+            Objective obj = notification.getValue();
+            switch (notification.getCause()) {
+                case EXPIRED:
+                case COLLECTED:
+                case SIZE:
+                    obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+                    break;
+                case EXPLICIT: // No action when the objective completes correctly
+                case REPLACED: // No action when a pending forward or next objective gets executed
+                default:
+                    break;
+            }
+        };
         filtObjQueueHead = CacheBuilder.newBuilder()
                 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<FilteringObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
+                .build();
         fwdObjQueueHead = CacheBuilder.newBuilder()
                 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<ForwardingObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
+                .build();
         nextObjQueueHead = CacheBuilder.newBuilder()
                 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
-                    Objective obj = notification.getValue();
-                    switch (notification.getCause()) {
-                        case EXPIRED:
-                        case COLLECTED:
-                        case SIZE:
-                            obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
-                            break;
-                        case EXPLICIT: // No action when the objective completes correctly
-                        case REPLACED: // No action when a pending forward or next objective gets executed
-                        default:
-                            break;
-                    }
-                }).build();
+                .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
+                .build();
 
         cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
         cacheCleaner.scheduleAtFixedRate(() -> {
@@ -146,6 +135,10 @@
         cacheCleaner.shutdown();
         clearQueue();
 
+        filtCacheEventExecutor.shutdown();
+        fwdCacheEventExecutor.shutdown();
+        nextCacheEventExecutor.shutdown();
+
         super.deactivate();
     }
 
@@ -318,7 +311,7 @@
 
         if (obj instanceof FilteringObjective) {
             FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 filtObjQueueHead.invalidate(k);
             }
             filtObjQueue.remove(k, obj);
@@ -326,7 +319,7 @@
         } else if (obj instanceof ForwardingObjective) {
             ForwardingObjQueueKey k =
                     new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 fwdObjQueueHead.invalidate(k);
             }
             fwdObjQueue.remove(k, obj);
@@ -349,7 +342,7 @@
                 }
             }
             NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
-            if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+            if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
                 nextObjQueueHead.invalidate(k);
             }
             nextObjQueue.remove(k, obj);