Address StackOverflow by offloading cache listener to a separate thread
In addition,
- Address Thomas' comment in gerrit #20442
- Clean up redundant code
Change-Id: Iafb20895d3b338431c7faf9b810b6a6f6b055d92
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
index 8ec1bce3..acf09b0 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
@@ -24,7 +24,7 @@
/**
* Filtering objective queue key.
*/
-public class FilteringObjQueueKey {
+public class FilteringObjQueueKey implements ObjectiveQueueKey {
private DeviceId deviceId;
private int priority;
private Criterion key;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
index d963d8a..fa0c1a4 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
@@ -24,7 +24,7 @@
/**
* Forwarding objective queue key.
*/
-public class ForwardingObjQueueKey {
+public class ForwardingObjQueueKey implements ObjectiveQueueKey {
private DeviceId deviceId;
private int priority;
private TrafficSelector selector;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
index a2d9c64..b49238f 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
@@ -23,7 +23,7 @@
/**
* Next objective queue key.
*/
-public class NextObjQueueKey {
+public class NextObjQueueKey implements ObjectiveQueueKey {
private DeviceId deviceId;
private int id;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java
new file mode 100644
index 0000000..30bcea0
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+/**
+ * Objective queue key.
+ */
+public interface ObjectiveQueueKey {
+
+}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 16b0e69..3f4de99 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -18,7 +18,8 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalListeners;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
@@ -37,6 +38,7 @@
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.net.flowobjective.ObjectiveQueueKey;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -45,11 +47,14 @@
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
@@ -64,6 +69,9 @@
private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
private ScheduledExecutorService cacheCleaner;
+ private ExecutorService filtCacheEventExecutor;
+ private ExecutorService fwdCacheEventExecutor;
+ private ExecutorService nextCacheEventExecutor;
private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
@@ -78,55 +86,36 @@
protected void activate() {
super.activate();
- // TODO Clean up duplicated code
+ filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
+ fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
+ nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
+
+ RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
+ Objective obj = notification.getValue();
+ switch (notification.getCause()) {
+ case EXPIRED:
+ case COLLECTED:
+ case SIZE:
+ obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+ break;
+ case EXPLICIT: // No action when the objective completes correctly
+ case REPLACED: // No action when a pending forward or next objective gets executed
+ default:
+ break;
+ }
+ };
filtObjQueueHead = CacheBuilder.newBuilder()
.expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
- .removalListener((RemovalNotification<FilteringObjQueueKey, Objective> notification) -> {
- Objective obj = notification.getValue();
- switch (notification.getCause()) {
- case EXPIRED:
- case COLLECTED:
- case SIZE:
- obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
- break;
- case EXPLICIT: // No action when the objective completes correctly
- case REPLACED: // No action when a pending forward or next objective gets executed
- default:
- break;
- }
- }).build();
+ .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
+ .build();
fwdObjQueueHead = CacheBuilder.newBuilder()
.expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
- .removalListener((RemovalNotification<ForwardingObjQueueKey, Objective> notification) -> {
- Objective obj = notification.getValue();
- switch (notification.getCause()) {
- case EXPIRED:
- case COLLECTED:
- case SIZE:
- obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
- break;
- case EXPLICIT: // No action when the objective completes correctly
- case REPLACED: // No action when a pending forward or next objective gets executed
- default:
- break;
- }
- }).build();
+ .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
+ .build();
nextObjQueueHead = CacheBuilder.newBuilder()
.expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
- .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
- Objective obj = notification.getValue();
- switch (notification.getCause()) {
- case EXPIRED:
- case COLLECTED:
- case SIZE:
- obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
- break;
- case EXPLICIT: // No action when the objective completes correctly
- case REPLACED: // No action when a pending forward or next objective gets executed
- default:
- break;
- }
- }).build();
+ .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
+ .build();
cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
cacheCleaner.scheduleAtFixedRate(() -> {
@@ -146,6 +135,10 @@
cacheCleaner.shutdown();
clearQueue();
+ filtCacheEventExecutor.shutdown();
+ fwdCacheEventExecutor.shutdown();
+ nextCacheEventExecutor.shutdown();
+
super.deactivate();
}
@@ -318,7 +311,7 @@
if (obj instanceof FilteringObjective) {
FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
- if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+ if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
filtObjQueueHead.invalidate(k);
}
filtObjQueue.remove(k, obj);
@@ -326,7 +319,7 @@
} else if (obj instanceof ForwardingObjective) {
ForwardingObjQueueKey k =
new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
- if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+ if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
fwdObjQueueHead.invalidate(k);
}
fwdObjQueue.remove(k, obj);
@@ -349,7 +342,7 @@
}
}
NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
- if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+ if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
nextObjQueueHead.invalidate(k);
}
nextObjQueue.remove(k, obj);