Avoid head of queue blocking in InOrderFlowObjecitveManager
Change-Id: Ifec6f861634a0a7c335d0591861391b03c36f854
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 823891f..f32bb6c 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -16,6 +16,9 @@
package org.onosproject.net.flowobjective.impl;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
@@ -43,13 +46,25 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
@Component(immediate = true, enabled = true)
@Service
public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
private final Logger log = LoggerFactory.getLogger(getClass());
- // TODO Making these cache and timeout the entries
+ // TODO Make queue timeout configurable
+ static final int OBJ_TIMEOUT_MS = 5000;
+
+ private Cache<FiltObjQueueKey, Objective> filtObjQueueHead;
+ private Cache<FwdObjQueueKey, Objective> fwdObjQueueHead;
+ private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
+ private ScheduledExecutorService cacheCleaner;
+
private ListMultimap<FiltObjQueueKey, Objective> filtObjQueue =
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
private ListMultimap<FwdObjQueueKey, Objective> fwdObjQueue =
@@ -62,14 +77,77 @@
@Activate
protected void activate() {
super.activate();
+
+ // TODO Clean up duplicated code
+ filtObjQueueHead = CacheBuilder.newBuilder()
+ .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalNotification<FiltObjQueueKey, Objective> notification) -> {
+ Objective obj = notification.getValue();
+ switch (notification.getCause()) {
+ case EXPIRED:
+ case COLLECTED:
+ case SIZE:
+ obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+ break;
+ case EXPLICIT: // No action when the objective completes correctly
+ case REPLACED: // No action when a pending forward or next objective gets executed
+ default:
+ break;
+ }
+ }).build();
+ fwdObjQueueHead = CacheBuilder.newBuilder()
+ .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalNotification<FwdObjQueueKey, Objective> notification) -> {
+ Objective obj = notification.getValue();
+ switch (notification.getCause()) {
+ case EXPIRED:
+ case COLLECTED:
+ case SIZE:
+ obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+ break;
+ case EXPLICIT: // No action when the objective completes correctly
+ case REPLACED: // No action when a pending forward or next objective gets executed
+ default:
+ break;
+ }
+ }).build();
+ nextObjQueueHead = CacheBuilder.newBuilder()
+ .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
+ Objective obj = notification.getValue();
+ switch (notification.getCause()) {
+ case EXPIRED:
+ case COLLECTED:
+ case SIZE:
+ obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+ break;
+ case EXPLICIT: // No action when the objective completes correctly
+ case REPLACED: // No action when a pending forward or next objective gets executed
+ default:
+ break;
+ }
+ }).build();
+
+ cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
+ cacheCleaner.scheduleAtFixedRate(() -> {
+ filtObjQueueHead.cleanUp();
+ fwdObjQueueHead.cleanUp();
+ nextObjQueueHead.cleanUp();
+ }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
// Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
- // process()
+ // execute()
flowObjectiveStore.unsetDelegate(super.delegate);
flowObjectiveStore.setDelegate(delegate);
}
@Deactivate
protected void deactivate() {
+ cacheCleaner.shutdown();
+ filtObjQueueHead.invalidateAll();
+ fwdObjQueueHead.invalidateAll();
+ nextObjQueueHead.invalidateAll();
+
super.deactivate();
}
@@ -93,7 +171,7 @@
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.trace("Flow objective onError {}", objective);
+ log.warn("Flow objective onError {}", objective);
dequeue(deviceId, objective);
originalContext.ifPresent(c -> c.onError(objective, error));
}
@@ -195,14 +273,17 @@
if (obj instanceof FilteringObjective) {
FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+ filtObjQueueHead.invalidate(k);
filtObjQueue.remove(k, obj);
remaining = filtObjQueue.get(k);
} else if (obj instanceof ForwardingObjective) {
FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+ fwdObjQueueHead.invalidate(k);
fwdObjQueue.remove(k, obj);
remaining = fwdObjQueue.get(k);
} else if (obj instanceof NextObjective) {
NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+ nextObjQueueHead.invalidate(k);
nextObjQueue.remove(k, obj);
remaining = nextObjQueue.get(k);
} else {
@@ -228,11 +309,18 @@
LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
+ int priority = obj.priority();
if (obj instanceof FilteringObjective) {
+ FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+ filtObjQueueHead.put(k, obj);
super.filter(deviceId, (FilteringObjective) obj);
} else if (obj instanceof ForwardingObjective) {
+ FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+ fwdObjQueueHead.put(k, obj);
super.forward(deviceId, (ForwardingObjective) obj);
} else if (obj instanceof NextObjective) {
+ NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+ nextObjQueueHead.put(k, obj);
super.next(deviceId, (NextObjective) obj);
} else {
log.error("Unknown flow objective instance: {}", obj.getClass().getName());
@@ -256,7 +344,7 @@
} else {
log.debug("Processing {} pending forwarding objectives for nextId {}",
pending.size(), event.subject());
- // resubmitted back to the execution queue
+ // execute pending forwards one by one
pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
}
@@ -274,7 +362,7 @@
} else {
log.debug("Processing {} pending next objectives for nextId {}",
pendNexts.size(), event.subject());
- // resubmitted back to the execution queue
+ // execute pending nexts one by one
pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
}
}
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index bb89ced..7ce1c16 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -26,6 +26,7 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.MplsLabel;
import org.onlab.packet.VlanId;
+import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.net.DeviceId;
@@ -33,6 +34,8 @@
import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.PipelinerAdapter;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector;
@@ -46,6 +49,7 @@
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
@@ -53,6 +57,7 @@
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
@@ -89,8 +94,10 @@
private static final NextGroup NGRP2 = () -> new byte[] {0x02, 0x03};
// Delay flow objectives OFFSET + rand(0, BOUND) millis
- private static final int OFFSET = 10; // ms
- private static final int BOUND = 40; // ms
+ private static final int DEFAULT_OFFSET = 10; // ms
+ private static final int DEFAULT_BOUND = 40; // ms
+ private static int offset = DEFAULT_OFFSET;
+ private static int bound = DEFAULT_BOUND;
private static final FilteringObjective FILT1 = buildFilteringObjective(P2, V3, M3, 1).add();
private static final FilteringObjective FILT2 = buildFilteringObjective(P2, V2, M2, 2).add();
@@ -124,6 +131,16 @@
private List<ForwardingObjective> expectFwdObjs = Lists.newCopyOnWriteArrayList(
Lists.newArrayList(FWD1, FWD2, FWD3, FWD4, FWD5, FWD6));
+ private static boolean timeout = false;
+ private static final ForwardingObjective FWD11 = buildFwdObjective(S1, NID2).add(new ObjectiveContext() {
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ timeout = error.equals(ObjectiveError.INSTALLATIONTIMEOUT);
+ }
+ });
+ private List<ForwardingObjective> expectFwdObjsTimeout = Lists.newCopyOnWriteArrayList(
+ Lists.newArrayList(FWD11, FWD1, FWD2));
+
private List<Objective> actualObjs = Lists.newCopyOnWriteArrayList();
private Pipeliner pipeliner = new PipelinerAdapter() {
@@ -163,7 +180,7 @@
*/
private void recordObjective(Objective obj) {
try {
- Thread.sleep(new Random().nextInt(BOUND) + OFFSET);
+ Thread.sleep(new Random().nextInt(bound) + offset);
actualObjs.add(obj);
obj.context().ifPresent(c -> c.onSuccess(obj));
} catch (Exception e) {
@@ -177,17 +194,25 @@
mgr = new InOrderFlowObjectiveManager();
mgr.pipeliners.put(DEV1, pipeliner);
mgr.executorService = newFixedThreadPool(4, groupedThreads("foo", "bar"));
+ mgr.cfgService = createMock(ComponentConfigService.class);
+ mgr.deviceService = createMock(DeviceService.class);
+ mgr.driverService = createMock(DriverService.class);
mgr.flowObjectiveStore = createMock(FlowObjectiveStore.class);
+ mgr.activate();
+ reset(mgr.flowObjectiveStore);
+ timeout = false;
+ offset = DEFAULT_OFFSET;
+ bound = DEFAULT_BOUND;
actualObjs.clear();
}
@Test
- public void filter() throws Exception {
+ public void filter() {
expectFiltObjs.forEach(filtObj -> mgr.filter(DEV1, filtObj));
// Wait for the pipeline operation to complete
- int expectedTime = (BOUND + OFFSET) * 7;
+ int expectedTime = (bound + offset) * 7;
assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFiltObjs.size(), actualObjs.size()));
assertTrue(actualObjs.indexOf(FILT1) < actualObjs.indexOf(FILT2));
@@ -198,7 +223,7 @@
}
@Test
- public void forward() throws Exception {
+ public void forward() {
expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
replay(mgr.flowObjectiveStore);
@@ -206,7 +231,7 @@
expectFwdObjs.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
// Wait for the pipeline operation to complete
- int expectedTime = (BOUND + OFFSET) * 6;
+ int expectedTime = (bound + offset) * 6;
assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size(), actualObjs.size()));
assertTrue(actualObjs.indexOf(FWD1) < actualObjs.indexOf(FWD3));
@@ -218,7 +243,28 @@
}
@Test
- public void forwardPending() throws Exception {
+ public void forwardTimeout() {
+ expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(1);
+ expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(2);
+ replay(mgr.flowObjectiveStore);
+
+ // Force this objective to time out
+ offset = InOrderFlowObjectiveManager.OBJ_TIMEOUT_MS + 500;
+
+ expectFwdObjsTimeout.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
+
+ // Wait for the pipeline operation to complete
+ int expectedTime = (bound + offset) * 3;
+ assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjsTimeout.size(), actualObjs.size()));
+
+ assertTrue(timeout);
+ assertTrue(actualObjs.indexOf(FWD11) < actualObjs.indexOf(FWD1));
+
+ verify(mgr.flowObjectiveStore);
+ }
+
+ @Test
+ public void forwardPending() {
// Note: current logic will double check if the next obj need to be queued
// it does not check when resubmitting pending next back to the queue
expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(2);
@@ -234,7 +280,7 @@
mgr.next(DEV1, NEXT2);
// Wait for the pipeline operation to complete
- int expectedTime = (BOUND + OFFSET) * 8;
+ int expectedTime = (bound + offset) * 8;
assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size() + 2, actualObjs.size()));
assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(FWD1));
@@ -248,7 +294,7 @@
}
@Test
- public void next() throws Exception {
+ public void next() {
// Note: ADD operation won't query this
expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
@@ -257,7 +303,7 @@
expectNextObjs.forEach(nextObj -> mgr.next(DEV1, nextObj));
// Wait for the pipeline operation to complete
- int expectedTime = (BOUND + OFFSET) * 8;
+ int expectedTime = (bound + offset) * 8;
assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT3));
@@ -276,7 +322,7 @@
// pendingForwards, pendingNexts and ordering queue caches.
@Test
@Ignore("Not supported")
- public void nextPending() throws Exception {
+ public void nextPending() {
// Note: current logic will double check if the next obj need to be queued
// it does not check when resubmitting pending next back to the queue
expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(6);
@@ -286,7 +332,7 @@
expectNextObjsPending.forEach(nextObj -> mgr.next(DEV1, nextObj));
// Wait for the pipeline operation to complete
- int expectedTime = (BOUND + OFFSET) * 8;
+ int expectedTime = (bound + offset) * 8;
assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT5));