Address StackOverflow by offloading cache listener to a separate thread
In addition,
- Address Thomas' comment in gerrit #20442
- Clean up redundant code and speed up unit tests
Change-Id: Iafb20895d3b338431c7faf9b810b6a6f6b055d92
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
index 8ec1bce3..acf09b0 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjQueueKey.java
@@ -24,7 +24,7 @@
/**
* Filtering objective queue key.
*/
-public class FilteringObjQueueKey {
+public class FilteringObjQueueKey implements ObjectiveQueueKey {
private DeviceId deviceId;
private int priority;
private Criterion key;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
index d963d8a..fa0c1a4 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjQueueKey.java
@@ -24,7 +24,7 @@
/**
* Forwarding objective queue key.
*/
-public class ForwardingObjQueueKey {
+public class ForwardingObjQueueKey implements ObjectiveQueueKey {
private DeviceId deviceId;
private int priority;
private TrafficSelector selector;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
index a2d9c64..b49238f 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjQueueKey.java
@@ -23,7 +23,7 @@
/**
* Next objective queue key.
*/
-public class NextObjQueueKey {
+public class NextObjQueueKey implements ObjectiveQueueKey {
private DeviceId deviceId;
private int id;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java
new file mode 100644
index 0000000..30bcea0
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveQueueKey.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+/**
+ * Objective queue key.
+ */
+public interface ObjectiveQueueKey {
+
+}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 4f70745..a050e07 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -18,7 +18,8 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalListeners;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
@@ -40,16 +41,20 @@
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.net.flowobjective.ObjectiveQueueKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
@@ -59,12 +64,15 @@
private final Logger log = LoggerFactory.getLogger(getClass());
// TODO Make queue timeout configurable
- static final int OBJ_TIMEOUT_MS = 15000;
+ static int objTimeoutMs = 15000;
private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
private ScheduledExecutorService cacheCleaner;
+ private ExecutorService filtCacheEventExecutor;
+ private ExecutorService fwdCacheEventExecutor;
+ private ExecutorService nextCacheEventExecutor;
private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
@@ -79,62 +87,43 @@
protected void activate() {
super.activate();
- // TODO Clean up duplicated code
+ filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
+ fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
+ nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
+
+ RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
+ Objective obj = notification.getValue();
+ switch (notification.getCause()) {
+ case EXPIRED:
+ case COLLECTED:
+ case SIZE:
+ obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+ break;
+ case EXPLICIT: // No action when the objective completes correctly
+ case REPLACED: // No action when a pending forward or next objective gets executed
+ default:
+ break;
+ }
+ };
filtObjQueueHead = CacheBuilder.newBuilder()
- .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
- .removalListener((RemovalNotification<FilteringObjQueueKey, Objective> notification) -> {
- Objective obj = notification.getValue();
- switch (notification.getCause()) {
- case EXPIRED:
- case COLLECTED:
- case SIZE:
- obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
- break;
- case EXPLICIT: // No action when the objective completes correctly
- case REPLACED: // No action when a pending forward or next objective gets executed
- default:
- break;
- }
- }).build();
+ .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
+ .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
+ .build();
fwdObjQueueHead = CacheBuilder.newBuilder()
- .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
- .removalListener((RemovalNotification<ForwardingObjQueueKey, Objective> notification) -> {
- Objective obj = notification.getValue();
- switch (notification.getCause()) {
- case EXPIRED:
- case COLLECTED:
- case SIZE:
- obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
- break;
- case EXPLICIT: // No action when the objective completes correctly
- case REPLACED: // No action when a pending forward or next objective gets executed
- default:
- break;
- }
- }).build();
+ .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
+ .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
+ .build();
nextObjQueueHead = CacheBuilder.newBuilder()
- .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
- .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
- Objective obj = notification.getValue();
- switch (notification.getCause()) {
- case EXPIRED:
- case COLLECTED:
- case SIZE:
- obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
- break;
- case EXPLICIT: // No action when the objective completes correctly
- case REPLACED: // No action when a pending forward or next objective gets executed
- default:
- break;
- }
- }).build();
+ .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
+ .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
+ .build();
cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
cacheCleaner.scheduleAtFixedRate(() -> {
filtObjQueueHead.cleanUp();
fwdObjQueueHead.cleanUp();
nextObjQueueHead.cleanUp();
- }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ }, 0, objTimeoutMs, TimeUnit.MILLISECONDS);
// Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
// execute()
@@ -147,6 +136,10 @@
cacheCleaner.shutdown();
clearQueue();
+ filtCacheEventExecutor.shutdown();
+ fwdCacheEventExecutor.shutdown();
+ nextCacheEventExecutor.shutdown();
+
super.deactivate();
}
@@ -319,7 +312,7 @@
if (obj instanceof FilteringObjective) {
FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
- if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+ if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
filtObjQueueHead.invalidate(k);
}
filtObjQueue.remove(k, obj);
@@ -327,7 +320,7 @@
} else if (obj instanceof ForwardingObjective) {
ForwardingObjQueueKey k =
new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
- if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+ if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
fwdObjQueueHead.invalidate(k);
}
fwdObjQueue.remove(k, obj);
@@ -350,7 +343,7 @@
}
}
NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
- if (!ObjectiveError.INSTALLATIONTIMEOUT.equals(error)) {
+ if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
nextObjQueueHead.invalidate(k);
}
nextObjQueue.remove(k, obj);
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index 7ce1c16..94096b5 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -66,7 +66,9 @@
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
public class InOrderFlowObjectiveManagerTest {
private InOrderFlowObjectiveManager mgr;
@@ -96,6 +98,7 @@
// Delay flow objectives OFFSET + rand(0, BOUND) millis
private static final int DEFAULT_OFFSET = 10; // ms
private static final int DEFAULT_BOUND = 40; // ms
+ private static final int TIMEOUT_THRESH = 100; // ms
private static int offset = DEFAULT_OFFSET;
private static int bound = DEFAULT_BOUND;
@@ -131,16 +134,6 @@
private List<ForwardingObjective> expectFwdObjs = Lists.newCopyOnWriteArrayList(
Lists.newArrayList(FWD1, FWD2, FWD3, FWD4, FWD5, FWD6));
- private static boolean timeout = false;
- private static final ForwardingObjective FWD11 = buildFwdObjective(S1, NID2).add(new ObjectiveContext() {
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- timeout = error.equals(ObjectiveError.INSTALLATIONTIMEOUT);
- }
- });
- private List<ForwardingObjective> expectFwdObjsTimeout = Lists.newCopyOnWriteArrayList(
- Lists.newArrayList(FWD11, FWD1, FWD2));
-
private List<Objective> actualObjs = Lists.newCopyOnWriteArrayList();
private Pipeliner pipeliner = new PipelinerAdapter() {
@@ -201,7 +194,6 @@
mgr.activate();
reset(mgr.flowObjectiveStore);
- timeout = false;
offset = DEFAULT_OFFSET;
bound = DEFAULT_BOUND;
actualObjs.clear();
@@ -244,12 +236,28 @@
@Test
public void forwardTimeout() {
+ final AtomicInteger counter = new AtomicInteger(0);
+ ForwardingObjective fwdTimeout = buildFwdObjective(S1, NID2).add(new ObjectiveContext() {
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ if (Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
+ counter.incrementAndGet();
+ }
+ }
+ });
+ List<ForwardingObjective> expectFwdObjsTimeout = Lists.newCopyOnWriteArrayList(
+ Lists.newArrayList(fwdTimeout, FWD1, FWD2));
+
+ // Reduce timeout so the unit test doesn't have to wait many seconds
+ InOrderFlowObjectiveManager.objTimeoutMs = TIMEOUT_THRESH;
+ setUp();
+
expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(1);
expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(2);
replay(mgr.flowObjectiveStore);
// Force this objective to time out
- offset = InOrderFlowObjectiveManager.OBJ_TIMEOUT_MS + 500;
+ offset = InOrderFlowObjectiveManager.objTimeoutMs * 2;
expectFwdObjsTimeout.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
@@ -257,8 +265,8 @@
int expectedTime = (bound + offset) * 3;
assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjsTimeout.size(), actualObjs.size()));
- assertTrue(timeout);
- assertTrue(actualObjs.indexOf(FWD11) < actualObjs.indexOf(FWD1));
+ assertTrue(counter.get() != 0);
+ assertTrue(actualObjs.indexOf(fwdTimeout) < actualObjs.indexOf(FWD1));
verify(mgr.flowObjectiveStore);
}