[ONOS-4004] DistributedStatisticStore and DistributedFlowStatisticStore make configurable
- Using @Property and @Modified annotations
- Fix DistributedPacketStore / DistributedStatisticStore / DistributedFlowStatisticStore
Change-Id: I6c907498496b9f21a8ef13b7badeb24770cb88ff
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index bab5cdf..630de79 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -21,6 +21,8 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
@@ -44,14 +46,20 @@
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
+import java.util.Dictionary;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.retryable;
import static org.slf4j.LoggerFactory.getLogger;
@@ -68,10 +76,7 @@
private final Logger log = getLogger(getClass());
- private static final int MAX_BACKOFF = 50;
-
- // TODO: make this configurable.
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+ private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@@ -102,10 +107,17 @@
private ExecutorService messageHandlingExecutor;
+ private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+ @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ label = "Size of thread pool to assign message handler")
+ private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+ private static final int MAX_BACKOFF = 50;
+
@Activate
public void activate() {
messageHandlingExecutor = Executors.newFixedThreadPool(
- MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ messageHandlerThreadPoolSize,
groupedThreads("onos/store/packet", "message-handlers"));
communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
@@ -126,6 +138,33 @@
log.info("Stopped");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+ int newMessageHandlerThreadPoolSize;
+
+ try {
+ String s = get(properties, "messageHandlerThreadPoolSize");
+
+ newMessageHandlerThreadPoolSize =
+ isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+ } catch (NumberFormatException e) {
+ log.warn(e.getMessage());
+ newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+ }
+
+ // Any change in the following parameters implies thread pool restart
+ if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+ setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+ restartMessageHandlerThreadPool();
+ }
+
+ log.info(FORMAT, messageHandlerThreadPoolSize);
+ }
+
+
@Override
public void emit(OutboundPacket packet) {
NodeId myId = clusterService.getLocalNode().id();
@@ -239,4 +278,32 @@
return list;
}
}
+
+ /**
+ * Sets thread pool size of message handler.
+ *
+ * @param poolSize
+ */
+ private void setMessageHandlerThreadPoolSize(int poolSize) {
+ checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+ messageHandlerThreadPoolSize = poolSize;
+ }
+
+ /**
+ * Restarts thread pool of message handler.
+ */
+ private void restartMessageHandlerThreadPool() {
+ ExecutorService prevExecutor = messageHandlingExecutor;
+ messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
+ prevExecutor.shutdown();
+ }
+
+ /**
+ * Gets current thread pool size of message handler.
+ *
+ * @return messageHandlerThreadPoolSize
+ */
+ private int getMessageHandlerThreadPoolSize() {
+ return messageHandlerThreadPoolSize;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
index cc14eba..8e59ef6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
@@ -20,6 +20,8 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
@@ -39,18 +41,24 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
@@ -65,8 +73,7 @@
public class DistributedFlowStatisticStore implements FlowStatisticStore {
private final Logger log = getLogger(getClass());
- // TODO: Make configurable.
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+ private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@@ -97,6 +104,12 @@
private NodeId local;
private ExecutorService messageHandlingExecutor;
+ private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+ @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ label = "Size of thread pool to assign message handler")
+ private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+
private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
@Activate
@@ -104,7 +117,7 @@
local = clusterService.getLocalNode().id();
messageHandlingExecutor = Executors.newFixedThreadPool(
- MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ messageHandlerThreadPoolSize,
groupedThreads("onos/store/statistic", "message-handlers"));
clusterCommunicator.addSubscriber(
@@ -126,6 +139,32 @@
log.info("Stopped");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+ int newMessageHandlerThreadPoolSize;
+
+ try {
+ String s = get(properties, "messageHandlerThreadPoolSize");
+
+ newMessageHandlerThreadPoolSize =
+ isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+ } catch (NumberFormatException e) {
+ log.warn(e.getMessage());
+ newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+ }
+
+ // Any change in the following parameters implies thread pool restart
+ if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+ setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+ restartMessageHandlerThreadPool();
+ }
+
+ log.info(FORMAT, messageHandlerThreadPoolSize);
+ }
+
@Override
public synchronized void removeFlowStatistic(FlowRule rule) {
ConnectPoint cp = buildConnectPoint(rule);
@@ -134,10 +173,16 @@
}
// remove this rule if present from current map
- current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ current.computeIfPresent(cp, (c, e) -> {
+ e.remove(rule);
+ return e;
+ });
// remove this on if present from previous map
- previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ previous.computeIfPresent(cp, (c, e) -> {
+ e.remove(rule);
+ return e;
+ });
}
@Override
@@ -286,4 +331,32 @@
}
return null;
}
+
+ /**
+ * Sets thread pool size of message handler.
+ *
+ * @param poolSize
+ */
+ private void setMessageHandlerThreadPoolSize(int poolSize) {
+ checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+ messageHandlerThreadPoolSize = poolSize;
+ }
+
+ /**
+ * Restarts thread pool of message handler.
+ */
+ private void restartMessageHandlerThreadPool() {
+ ExecutorService prevExecutor = messageHandlingExecutor;
+ messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
+ prevExecutor.shutdown();
+ }
+
+ /**
+ * Gets current thread pool size of message handler.
+ *
+ * @return messageHandlerThreadPoolSize
+ */
+ private int getMessageHandlerThreadPoolSize() {
+ return messageHandlerThreadPoolSize;
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index 35bdad1..a483194 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -20,6 +20,8 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
@@ -39,11 +41,14 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.HashSet;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -51,6 +56,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
@@ -67,8 +75,7 @@
private final Logger log = getLogger(getClass());
- // TODO: Make configurable.
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+ private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@@ -101,13 +108,18 @@
private ExecutorService messageHandlingExecutor;
+ private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+ @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ label = "Size of thread pool to assign message handler")
+ private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
@Activate
public void activate() {
messageHandlingExecutor = Executors.newFixedThreadPool(
- MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ messageHandlerThreadPoolSize,
groupedThreads("onos/store/statistic", "message-handlers"));
clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
@@ -133,6 +145,33 @@
log.info("Stopped");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+ int newMessageHandlerThreadPoolSize;
+
+ try {
+ String s = get(properties, "messageHandlerThreadPoolSize");
+
+ newMessageHandlerThreadPoolSize =
+ isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+ } catch (NumberFormatException e) {
+ log.warn(e.getMessage());
+ newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+ }
+
+ // Any change in the following parameters implies thread pool restart
+ if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+ setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+ restartMessageHandlerThreadPool();
+ }
+
+ log.info(FORMAT, messageHandlerThreadPoolSize);
+ }
+
+
@Override
public void prepareForStatistics(FlowRule rule) {
ConnectPoint cp = buildConnectPoint(rule);
@@ -314,4 +353,32 @@
}
+ /**
+ * Sets thread pool size of message handler.
+ *
+ * @param poolSize
+ */
+ private void setMessageHandlerThreadPoolSize(int poolSize) {
+ checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+ messageHandlerThreadPoolSize = poolSize;
+ }
+
+ /**
+ * Restarts thread pool of message handler.
+ */
+ private void restartMessageHandlerThreadPool() {
+ ExecutorService prevExecutor = messageHandlingExecutor;
+ messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
+ prevExecutor.shutdown();
+ }
+
+ /**
+ * Gets current thread pool size of message handler.
+ *
+ * @return messageHandlerThreadPoolSize
+ */
+ private int getMessageHandlerThreadPoolSize() {
+ return messageHandlerThreadPoolSize;
+ }
+
}