Adding configurable variables to DistributedFlowRuleStore
Change-Id: I989de6b4ca4caf8ac1e648fbae766d0f88414419
diff --git a/core/net/pom.xml b/core/net/pom.xml
index 27ca1a8..5647455 100644
--- a/core/net/pom.xml
+++ b/core/net/pom.xml
@@ -74,11 +74,6 @@
</dependency>
<dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.compendium</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
diff --git a/core/pom.xml b/core/pom.xml
index 5e2058b..97c33d8 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -52,6 +52,10 @@
<artifactId>onlab-junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index d972d2b..2906775 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -23,10 +23,13 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
+import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
@@ -68,12 +71,14 @@
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -88,6 +93,8 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
@@ -104,8 +111,17 @@
private final Logger log = getLogger(getClass());
- // TODO: Make configurable.
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+ private static final boolean DEFAULT_BACKUP_ENABLED = false;
+ private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+
+ @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ label = "Number of threads in the message handler pool")
+ private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+ @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
+ label = "Indicates whether backups are enabled or not")
+ private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
private InternalFlowTable flowTable;
@@ -127,6 +143,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
+ @Reference(cardinality = MANDATORY_UNARY)
+ protected ComponentConfigService configService;
+
private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
private ExecutorService messageHandlingExecutor;
@@ -143,24 +162,22 @@
}
};
- private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
-
private ReplicaInfoEventListener replicaInfoEventListener;
private IdGenerator idGenerator;
@Activate
- public void activate() {
+ public void activate(ComponentContext context) {
+ configService.registerProperties(getClass());
- flowTable = new InternalFlowTable(); // .withBackupsEnabled(false);
+ flowTable = new InternalFlowTable().withBackupsEnabled(backupEnabled);
idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
final NodeId local = clusterService.getLocalNode().id();
messageHandlingExecutor = Executors.newFixedThreadPool(
- MESSAGE_HANDLER_THREAD_POOL_SIZE,
- groupedThreads("onos/store/flow", "message-handlers"));
+ msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
@@ -222,11 +239,12 @@
replicaInfoManager.addListener(replicaInfoEventListener);
- log.info("Started");
+ logConfig("Started");
}
@Deactivate
- public void deactivate() {
+ public void deactivate(ComponentContext context) {
+ configService.unregisterProperties(getClass(), false);
clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
@@ -237,6 +255,46 @@
log.info("Stopped");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ if (context == null) {
+ backupEnabled = DEFAULT_BACKUP_ENABLED;
+ logConfig("Default config");
+ return;
+ }
+
+ Dictionary properties = context.getProperties();
+ int newPoolSize;
+ boolean newBackupEnabled;
+ try {
+ String s = (String) properties.get("msgHandlerPoolSize");
+ newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
+
+ s = (String) properties.get("backupEnabled");
+ newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
+
+ } catch (NumberFormatException | ClassCastException e) {
+ newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
+ newBackupEnabled = DEFAULT_BACKUP_ENABLED;
+ }
+
+ if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
+ msgHandlerPoolSize = newPoolSize;
+ backupEnabled = newBackupEnabled;
+ // reconfigure the store
+ flowTable.withBackupsEnabled(backupEnabled);
+ ExecutorService oldMsgHandler = messageHandlingExecutor;
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
+ oldMsgHandler.shutdown();
+ logConfig("Reconfigured");
+ }
+ }
+
+ private void logConfig(String prefix) {
+ log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
+ prefix, msgHandlerPoolSize, backupEnabled);
+ }
// This is not a efficient operation on a distributed sharded
// flow store. We need to revisit the need for this operation or at least
@@ -639,14 +697,13 @@
(flowId, flowEntry) ->
(flowEntry == null) ? null : deviceClockService.getTimestamp(flowEntry.deviceId());
- private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap = backupsEnabled ?
+ private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap =
new EventuallyConsistentMapImpl<>("flow-backup",
clusterService,
clusterCommunicator,
flowSerializer,
clockService,
- (key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true)
- : null;
+ (key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true);
private Collection<NodeId> getPeerNodes() {
List<NodeId> nodes = clusterService.getNodes()