Avoid head of queue blocking in InOrderFlowObjecitveManager
Change-Id: Ifec6f861634a0a7c335d0591861391b03c36f854
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 823891f..f32bb6c 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -16,6 +16,9 @@
package org.onosproject.net.flowobjective.impl;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
@@ -43,13 +46,25 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
@Component(immediate = true, enabled = true)
@Service
public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
private final Logger log = LoggerFactory.getLogger(getClass());
- // TODO Making these cache and timeout the entries
+ // TODO Make queue timeout configurable
+ static final int OBJ_TIMEOUT_MS = 5000;
+
+ private Cache<FiltObjQueueKey, Objective> filtObjQueueHead;
+ private Cache<FwdObjQueueKey, Objective> fwdObjQueueHead;
+ private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
+ private ScheduledExecutorService cacheCleaner;
+
private ListMultimap<FiltObjQueueKey, Objective> filtObjQueue =
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
private ListMultimap<FwdObjQueueKey, Objective> fwdObjQueue =
@@ -62,14 +77,77 @@
@Activate
protected void activate() {
super.activate();
+
+ // TODO Clean up duplicated code
+ filtObjQueueHead = CacheBuilder.newBuilder()
+ .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalNotification<FiltObjQueueKey, Objective> notification) -> {
+ Objective obj = notification.getValue();
+ switch (notification.getCause()) {
+ case EXPIRED:
+ case COLLECTED:
+ case SIZE:
+ obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+ break;
+ case EXPLICIT: // No action when the objective completes correctly
+ case REPLACED: // No action when a pending forward or next objective gets executed
+ default:
+ break;
+ }
+ }).build();
+ fwdObjQueueHead = CacheBuilder.newBuilder()
+ .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalNotification<FwdObjQueueKey, Objective> notification) -> {
+ Objective obj = notification.getValue();
+ switch (notification.getCause()) {
+ case EXPIRED:
+ case COLLECTED:
+ case SIZE:
+ obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+ break;
+ case EXPLICIT: // No action when the objective completes correctly
+ case REPLACED: // No action when a pending forward or next objective gets executed
+ default:
+ break;
+ }
+ }).build();
+ nextObjQueueHead = CacheBuilder.newBuilder()
+ .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
+ Objective obj = notification.getValue();
+ switch (notification.getCause()) {
+ case EXPIRED:
+ case COLLECTED:
+ case SIZE:
+ obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
+ break;
+ case EXPLICIT: // No action when the objective completes correctly
+ case REPLACED: // No action when a pending forward or next objective gets executed
+ default:
+ break;
+ }
+ }).build();
+
+ cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
+ cacheCleaner.scheduleAtFixedRate(() -> {
+ filtObjQueueHead.cleanUp();
+ fwdObjQueueHead.cleanUp();
+ nextObjQueueHead.cleanUp();
+ }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
// Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
- // process()
+ // execute()
flowObjectiveStore.unsetDelegate(super.delegate);
flowObjectiveStore.setDelegate(delegate);
}
@Deactivate
protected void deactivate() {
+ cacheCleaner.shutdown();
+ filtObjQueueHead.invalidateAll();
+ fwdObjQueueHead.invalidateAll();
+ nextObjQueueHead.invalidateAll();
+
super.deactivate();
}
@@ -93,7 +171,7 @@
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.trace("Flow objective onError {}", objective);
+ log.warn("Flow objective onError {}", objective);
dequeue(deviceId, objective);
originalContext.ifPresent(c -> c.onError(objective, error));
}
@@ -195,14 +273,17 @@
if (obj instanceof FilteringObjective) {
FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+ filtObjQueueHead.invalidate(k);
filtObjQueue.remove(k, obj);
remaining = filtObjQueue.get(k);
} else if (obj instanceof ForwardingObjective) {
FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+ fwdObjQueueHead.invalidate(k);
fwdObjQueue.remove(k, obj);
remaining = fwdObjQueue.get(k);
} else if (obj instanceof NextObjective) {
NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+ nextObjQueueHead.invalidate(k);
nextObjQueue.remove(k, obj);
remaining = nextObjQueue.get(k);
} else {
@@ -228,11 +309,18 @@
LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
+ int priority = obj.priority();
if (obj instanceof FilteringObjective) {
+ FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+ filtObjQueueHead.put(k, obj);
super.filter(deviceId, (FilteringObjective) obj);
} else if (obj instanceof ForwardingObjective) {
+ FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+ fwdObjQueueHead.put(k, obj);
super.forward(deviceId, (ForwardingObjective) obj);
} else if (obj instanceof NextObjective) {
+ NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+ nextObjQueueHead.put(k, obj);
super.next(deviceId, (NextObjective) obj);
} else {
log.error("Unknown flow objective instance: {}", obj.getClass().getName());
@@ -256,7 +344,7 @@
} else {
log.debug("Processing {} pending forwarding objectives for nextId {}",
pending.size(), event.subject());
- // resubmitted back to the execution queue
+ // execute pending forwards one by one
pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
}
@@ -274,7 +362,7 @@
} else {
log.debug("Processing {} pending next objectives for nextId {}",
pendNexts.size(), event.subject());
- // resubmitted back to the execution queue
+ // execute pending nexts one by one
pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
}
}