Improves VERIFY operations
Changes:
- Avoids to sends duplicate next when there are multiple sources
- Introduces a backpressure mechanism to not flood the pipeliners
- Guarantees there are at least 30s between each mcast corrector
execution
- Introduce a pool of 4 verifiers in FlowObjectiveManager to
separate the thread used for the installation/removal of the
FlowObjectives
- Improves logging in verifyGroup
Change-Id: I45aac0f80c9eb6afd763f21977d62df4a98f686e
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 3c5c571..4ba0d6e 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -105,8 +105,9 @@
private static final int INSTALL_RETRY_ATTEMPTS = 5;
private static final long INSTALL_RETRY_INTERVAL = 1000; // ms
- private static final String WORKER_PATTERN = "objective-installer-%d";
- private static final String GROUP_THREAD_NAME = "onos/objective-installer";
+ private static final String INSTALLER_PATTERN = "installer-%d";
+ private static final String VERIFIER_PATTERN = "verifier-%d";
+ private static final String GROUP_THREAD_NAME = "onos/objective";
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -174,14 +175,18 @@
// for debugging purposes
private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
- ExecutorService executorService;
+ ExecutorService installerExecutor;
+ ExecutorService verifierExecutor;
protected ExecutorService devEventExecutor;
@Activate
protected void activate(ComponentContext context) {
cfgService.registerProperties(FlowObjectiveManager.class);
- executorService = newFixedThreadPool(numThreads,
- groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+ installerExecutor = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, INSTALLER_PATTERN, log));
+ verifierExecutor = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, VERIFIER_PATTERN, log));
+
modified(context);
devEventExecutor = newSingleThreadScheduledExecutor(
groupedThreads("onos/flowobj-dev-events", "events-%d", log));
@@ -197,7 +202,8 @@
flowObjectiveStore.unsetDelegate(delegate);
deviceService.removeListener(deviceListener);
driverService.removeListener(driverListener);
- executorService.shutdown();
+ installerExecutor.shutdown();
+ verifierExecutor.shutdown();
devEventExecutor.shutdownNow();
devEventExecutor = null;
pipeliners.clear();
@@ -224,9 +230,15 @@
if (newNumThreads != numThreads && newNumThreads > 0) {
numThreads = newNumThreads;
- ExecutorService oldWorkerExecutor = executorService;
- executorService = newFixedThreadPool(numThreads,
- groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+ ExecutorService oldWorkerExecutor = installerExecutor;
+ installerExecutor = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, INSTALLER_PATTERN, log));
+ if (oldWorkerExecutor != null) {
+ oldWorkerExecutor.shutdown();
+ }
+ oldWorkerExecutor = verifierExecutor;
+ verifierExecutor = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, VERIFIER_PATTERN, log));
if (oldWorkerExecutor != null) {
oldWorkerExecutor.shutdown();
}
@@ -269,20 +281,24 @@
* make a few attempts to find the appropriate driver, then eventually give
* up and report an error if no suitable driver could be found.
*/
- class ObjectiveInstaller implements Runnable {
+ class ObjectiveProcessor implements Runnable {
final DeviceId deviceId;
final Objective objective;
+ final ExecutorService executor;
private final int numAttempts;
- ObjectiveInstaller(DeviceId deviceId, Objective objective) {
- this(deviceId, objective, 1);
+ ObjectiveProcessor(DeviceId deviceId, Objective objective,
+ ExecutorService executorService) {
+ this(deviceId, objective, 1, executorService);
}
- ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
+ ObjectiveProcessor(DeviceId deviceId, Objective objective, int attempts,
+ ExecutorService executorService) {
this.deviceId = checkNotNull(deviceId);
this.objective = checkNotNull(objective);
- this.numAttempts = attemps;
+ this.executor = checkNotNull(executorService);
+ this.numAttempts = attempts;
}
@Override
@@ -302,7 +318,8 @@
//Attempts to check if pipeliner is null for retry attempts
} else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
Thread.sleep(INSTALL_RETRY_INTERVAL);
- executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
+ executor.execute(new ObjectiveProcessor(deviceId, objective,
+ numAttempts + 1, executor));
} else {
// Otherwise we've tried a few times and failed, report an
// error back to the user.
@@ -311,7 +328,7 @@
}
//Exception thrown
} catch (Exception e) {
- log.warn("Exception while installing flow objective", e);
+ log.warn("Exception while processing flow objective", e);
}
}
}
@@ -319,7 +336,7 @@
@Override
public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
checkPermission(FLOWRULE_WRITE);
- executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
+ installerExecutor.execute(new ObjectiveProcessor(deviceId, filteringObjective, installerExecutor));
}
@Override
@@ -329,19 +346,21 @@
flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
!queueFwdObjective(deviceId, forwardingObjective)) {
// fast path
- executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
+ installerExecutor.execute(new ObjectiveProcessor(deviceId, forwardingObjective, installerExecutor));
}
}
@Override
public void next(DeviceId deviceId, NextObjective nextObjective) {
checkPermission(FLOWRULE_WRITE);
- if (nextObjective.op() == Operation.ADD ||
- nextObjective.op() == Operation.VERIFY ||
+ if (nextObjective.op() == Operation.VERIFY) {
+ // Verify does not need to wait
+ verifierExecutor.execute(new ObjectiveProcessor(deviceId, nextObjective, verifierExecutor));
+ } else if (nextObjective.op() == Operation.ADD ||
flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
!queueNextObjective(deviceId, nextObjective)) {
// either group exists or we are trying to create it - let it through
- executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+ installerExecutor.execute(new ObjectiveProcessor(deviceId, nextObjective, installerExecutor));
}
}