Make number of threads in FlowObjectiveManager configurable

Change-Id: I2035b6c954d1c5d31dea85c84665e626cf3a66c7
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 8ce67cd..c14471a 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -20,12 +20,16 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.osgi.DefaultServiceDirectory;
 import org.onlab.osgi.ServiceDirectory;
 import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.NextGroup;
@@ -49,6 +53,7 @@
 import org.onosproject.net.flowobjective.ObjectiveEvent;
 import org.onosproject.net.flowobjective.ObjectiveEvent.Type;
 import org.onosproject.net.group.GroupService;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +65,7 @@
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.security.AppGuard.checkPermission;
@@ -75,8 +81,18 @@
     public static final int INSTALL_RETRY_ATTEMPTS = 5;
     public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
 
+    private static final String WORKER_PATTERN = "objective-installer-%d";
+    private static final String GROUP_THREAD_NAME = "onos/objective-installer";
+    private static final String NUM_THREAD = "numThreads";
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final int DEFAULT_NUM_THREADS = 1;
+    @Property(name = NUM_THREAD,
+             intValue = DEFAULT_NUM_THREADS,
+             label = "Number of worker threads")
+    private int numThreads = DEFAULT_NUM_THREADS;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
 
@@ -98,6 +114,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowObjectiveStore flowObjectiveStore;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService cfgService;
+
     // Note: This must remain an optional dependency to allow re-install of default drivers.
     // Note: For now disabled until we can move to OPTIONAL_UNARY dependency
     // @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
@@ -124,7 +143,9 @@
 
     @Activate
     protected void activate() {
-        executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
+        cfgService.registerProperties(getClass());
+        executorService = newFixedThreadPool(numThreads,
+                                             groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
         flowObjectiveStore.setDelegate(delegate);
         deviceService.addListener(deviceListener);
         log.info("Started");
@@ -132,6 +153,7 @@
 
     @Deactivate
     protected void deactivate() {
+        cfgService.unregisterProperties(getClass(), false);
         flowObjectiveStore.unsetDelegate(delegate);
         deviceService.removeListener(deviceListener);
         executorService.shutdown();
@@ -141,6 +163,24 @@
         log.info("Stopped");
     }
 
+    @Modified
+    protected void modified(ComponentContext context) {
+        String propertyValue =
+                Tools.get(context.getProperties(), NUM_THREAD);
+        int newNumThreads = isNullOrEmpty(propertyValue) ? numThreads : Integer.parseInt(propertyValue);
+
+        if (newNumThreads != numThreads && newNumThreads > 0) {
+            numThreads = newNumThreads;
+            ExecutorService oldWorkerExecutor = executorService;
+            executorService = newFixedThreadPool(numThreads,
+                                                 groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+            if (oldWorkerExecutor != null) {
+                oldWorkerExecutor.shutdown();
+            }
+            log.info("Reconfigured number of worker threads to {}", numThreads);
+        }
+    }
+
     /**
      * Task that passes the flow objective down to the driver. The task will
      * make a few attempts to find the appropriate driver, then eventually give
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
index fe923dc..f4a12a9 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
@@ -23,6 +23,7 @@
 import org.junit.Test;
 import org.onlab.junit.TestUtils;
 import org.onlab.packet.ChassisId;
+import org.onosproject.cfg.ComponentConfigAdapter;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DefaultDevice;
 import org.onosproject.net.Device;
@@ -183,6 +184,9 @@
         }
     }
 
+    private class TestComponentConfigService extends ComponentConfigAdapter {
+    }
+
     @Before
     public void initializeTest() {
         manager = new FlowObjectiveManager();
@@ -190,6 +194,7 @@
         manager.deviceService = new TestDeviceService();
         manager.defaultDriverService = new TestDriversLoader();
         manager.driverService = new TestDriverService();
+        manager.cfgService = new TestComponentConfigService();
 
         filteringObjectives = new ArrayList<>();
         forwardingObjectives = new ArrayList<>();