Adding configurable variables to DistributedFlowRuleStore

Change-Id: I989de6b4ca4caf8ac1e648fbae766d0f88414419
diff --git a/core/net/pom.xml b/core/net/pom.xml
index 27ca1a8..5647455 100644
--- a/core/net/pom.xml
+++ b/core/net/pom.xml
@@ -74,11 +74,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.osgi</groupId>
-            <artifactId>org.osgi.compendium</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.scr.annotations</artifactId>
         </dependency>
diff --git a/core/pom.xml b/core/pom.xml
index 5e2058b..97c33d8 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -52,6 +52,10 @@
             <artifactId>onlab-junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index d972d2b..2906775 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -23,10 +23,13 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
@@ -68,12 +71,14 @@
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.StoreSerializer;
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -88,6 +93,8 @@
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
@@ -104,8 +111,17 @@
 
     private final Logger log = getLogger(getClass());
 
-    // TODO: Make configurable.
     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+    private static final boolean DEFAULT_BACKUP_ENABLED = false;
+    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+
+    @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
+            label = "Number of threads in the message handler pool")
+    private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+    @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
+            label = "Indicates whether backups are enabled or not")
+    private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
 
     private InternalFlowTable flowTable;
 
@@ -127,6 +143,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected ComponentConfigService configService;
+
     private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
 
     private ExecutorService messageHandlingExecutor;
@@ -143,24 +162,22 @@
         }
     };
 
-    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
-
     private ReplicaInfoEventListener replicaInfoEventListener;
 
     private IdGenerator idGenerator;
 
     @Activate
-    public void activate() {
+    public void activate(ComponentContext context) {
+        configService.registerProperties(getClass());
 
-        flowTable = new InternalFlowTable(); // .withBackupsEnabled(false);
+        flowTable = new InternalFlowTable().withBackupsEnabled(backupEnabled);
 
         idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
 
         final NodeId local = clusterService.getLocalNode().id();
 
         messageHandlingExecutor = Executors.newFixedThreadPool(
-                MESSAGE_HANDLER_THREAD_POOL_SIZE,
-                groupedThreads("onos/store/flow", "message-handlers"));
+                msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
 
         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
 
@@ -222,11 +239,12 @@
 
         replicaInfoManager.addListener(replicaInfoEventListener);
 
-        log.info("Started");
+        logConfig("Started");
     }
 
     @Deactivate
-    public void deactivate() {
+    public void deactivate(ComponentContext context) {
+        configService.unregisterProperties(getClass(), false);
         clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
@@ -237,6 +255,46 @@
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        if (context == null) {
+            backupEnabled = DEFAULT_BACKUP_ENABLED;
+            logConfig("Default config");
+            return;
+        }
+
+        Dictionary properties = context.getProperties();
+        int newPoolSize;
+        boolean newBackupEnabled;
+        try {
+            String s = (String) properties.get("msgHandlerPoolSize");
+            newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
+
+            s = (String) properties.get("backupEnabled");
+            newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
+
+        } catch (NumberFormatException | ClassCastException e) {
+            newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
+            newBackupEnabled = DEFAULT_BACKUP_ENABLED;
+        }
+
+        if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
+            msgHandlerPoolSize = newPoolSize;
+            backupEnabled = newBackupEnabled;
+            // reconfigure the store
+            flowTable.withBackupsEnabled(backupEnabled);
+            ExecutorService oldMsgHandler = messageHandlingExecutor;
+            messageHandlingExecutor = Executors.newFixedThreadPool(
+                    msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
+            oldMsgHandler.shutdown();
+            logConfig("Reconfigured");
+        }
+    }
+
+    private void logConfig(String prefix) {
+        log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
+                 prefix, msgHandlerPoolSize, backupEnabled);
+    }
 
     // This is not a efficient operation on a distributed sharded
     // flow store. We need to revisit the need for this operation or at least
@@ -639,14 +697,13 @@
                 (flowId, flowEntry) ->
                         (flowEntry == null) ? null : deviceClockService.getTimestamp(flowEntry.deviceId());
 
-        private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap = backupsEnabled ?
+        private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap =
                 new EventuallyConsistentMapImpl<>("flow-backup",
                         clusterService,
                         clusterCommunicator,
                         flowSerializer,
                         clockService,
-                        (key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true)
-                      : null;
+                        (key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true);
 
         private Collection<NodeId> getPeerNodes() {
             List<NodeId> nodes = clusterService.getNodes()