Make number of threads in FlowObjectiveManager configurable
Change-Id: I2035b6c954d1c5d31dea85c84665e626cf3a66c7
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 8ce67cd..c14471a 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -20,12 +20,16 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.NextGroup;
@@ -49,6 +53,7 @@
import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.flowobjective.ObjectiveEvent.Type;
import org.onosproject.net.group.GroupService;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +65,7 @@
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.security.AppGuard.checkPermission;
@@ -75,8 +81,18 @@
public static final int INSTALL_RETRY_ATTEMPTS = 5;
public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
+ private static final String WORKER_PATTERN = "objective-installer-%d";
+ private static final String GROUP_THREAD_NAME = "onos/objective-installer";
+ private static final String NUM_THREAD = "numThreads";
+
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final int DEFAULT_NUM_THREADS = 1;
+ @Property(name = NUM_THREAD,
+ intValue = DEFAULT_NUM_THREADS,
+ label = "Number of worker threads")
+ private int numThreads = DEFAULT_NUM_THREADS;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverService driverService;
@@ -98,6 +114,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowObjectiveStore flowObjectiveStore;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService cfgService;
+
// Note: This must remain an optional dependency to allow re-install of default drivers.
// Note: For now disabled until we can move to OPTIONAL_UNARY dependency
// @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
@@ -124,7 +143,9 @@
@Activate
protected void activate() {
- executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
+ cfgService.registerProperties(getClass());
+ executorService = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
flowObjectiveStore.setDelegate(delegate);
deviceService.addListener(deviceListener);
log.info("Started");
@@ -132,6 +153,7 @@
@Deactivate
protected void deactivate() {
+ cfgService.unregisterProperties(getClass(), false);
flowObjectiveStore.unsetDelegate(delegate);
deviceService.removeListener(deviceListener);
executorService.shutdown();
@@ -141,6 +163,24 @@
log.info("Stopped");
}
+ @Modified
+ protected void modified(ComponentContext context) {
+ String propertyValue =
+ Tools.get(context.getProperties(), NUM_THREAD);
+ int newNumThreads = isNullOrEmpty(propertyValue) ? numThreads : Integer.parseInt(propertyValue);
+
+ if (newNumThreads != numThreads && newNumThreads > 0) {
+ numThreads = newNumThreads;
+ ExecutorService oldWorkerExecutor = executorService;
+ executorService = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+ if (oldWorkerExecutor != null) {
+ oldWorkerExecutor.shutdown();
+ }
+ log.info("Reconfigured number of worker threads to {}", numThreads);
+ }
+ }
+
/**
* Task that passes the flow objective down to the driver. The task will
* make a few attempts to find the appropriate driver, then eventually give
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
index fe923dc..f4a12a9 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
@@ -23,6 +23,7 @@
import org.junit.Test;
import org.onlab.junit.TestUtils;
import org.onlab.packet.ChassisId;
+import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.Device;
@@ -183,6 +184,9 @@
}
}
+ private class TestComponentConfigService extends ComponentConfigAdapter {
+ }
+
@Before
public void initializeTest() {
manager = new FlowObjectiveManager();
@@ -190,6 +194,7 @@
manager.deviceService = new TestDeviceService();
manager.defaultDriverService = new TestDriversLoader();
manager.driverService = new TestDriverService();
+ manager.cfgService = new TestComponentConfigService();
filteringObjectives = new ArrayList<>();
forwardingObjectives = new ArrayList<>();