[ONOS-4004] DistributedStatisticStore and DistributedFlowStatisticStore make configurable
- Using @Property and @Modified annotations
- Fix DistributedPacketStore / DistributedStatisticStore / DistributedFlowStatisticStore

Change-Id: I6c907498496b9f21a8ef13b7badeb24770cb88ff
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index bab5cdf..630de79 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -21,6 +21,8 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
@@ -44,14 +46,20 @@
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import java.util.Dictionary;
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onlab.util.Tools.retryable;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -68,10 +76,7 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final int MAX_BACKOFF = 50;
-
-    // TODO: make this configurable.
-    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MastershipService mastershipService;
@@ -102,10 +107,17 @@
 
     private ExecutorService messageHandlingExecutor;
 
+    private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+            label = "Size of thread pool to assign message handler")
+    private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+    private static final int MAX_BACKOFF = 50;
+
     @Activate
     public void activate() {
         messageHandlingExecutor = Executors.newFixedThreadPool(
-                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                messageHandlerThreadPoolSize,
                 groupedThreads("onos/store/packet", "message-handlers"));
 
         communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
@@ -126,6 +138,33 @@
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+        int newMessageHandlerThreadPoolSize;
+
+        try {
+            String s = get(properties, "messageHandlerThreadPoolSize");
+
+            newMessageHandlerThreadPoolSize =
+                    isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException e) {
+            log.warn(e.getMessage());
+            newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+        }
+
+        // Any change in the following parameters implies thread pool restart
+        if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+            setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+            restartMessageHandlerThreadPool();
+        }
+
+        log.info(FORMAT, messageHandlerThreadPoolSize);
+    }
+
+
     @Override
     public void emit(OutboundPacket packet) {
         NodeId myId = clusterService.getLocalNode().id();
@@ -239,4 +278,32 @@
             return list;
         }
     }
+
+    /**
+     * Sets thread pool size of message handler.
+     *
+     * @param poolSize
+     */
+    private void setMessageHandlerThreadPoolSize(int poolSize) {
+        checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+        messageHandlerThreadPoolSize = poolSize;
+    }
+
+    /**
+     * Restarts thread pool of message handler.
+     */
+    private void restartMessageHandlerThreadPool() {
+        ExecutorService prevExecutor = messageHandlingExecutor;
+        messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
+        prevExecutor.shutdown();
+    }
+
+    /**
+     * Gets current thread pool size of message handler.
+     *
+     * @return messageHandlerThreadPoolSize
+     */
+    private int getMessageHandlerThreadPoolSize() {
+        return messageHandlerThreadPoolSize;
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
index cc14eba..8e59ef6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
@@ -20,6 +20,8 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
@@ -39,18 +41,24 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
@@ -65,8 +73,7 @@
 public class DistributedFlowStatisticStore implements FlowStatisticStore {
     private final Logger log = getLogger(getClass());
 
-    // TODO: Make configurable.
-    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MastershipService mastershipService;
@@ -97,6 +104,12 @@
     private NodeId local;
     private ExecutorService messageHandlingExecutor;
 
+    private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+            label = "Size of thread pool to assign message handler")
+    private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+
     private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
 
     @Activate
@@ -104,7 +117,7 @@
         local = clusterService.getLocalNode().id();
 
         messageHandlingExecutor = Executors.newFixedThreadPool(
-                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                messageHandlerThreadPoolSize,
                 groupedThreads("onos/store/statistic", "message-handlers"));
 
         clusterCommunicator.addSubscriber(
@@ -126,6 +139,32 @@
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+        int newMessageHandlerThreadPoolSize;
+
+        try {
+            String s = get(properties, "messageHandlerThreadPoolSize");
+
+            newMessageHandlerThreadPoolSize =
+                    isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException e) {
+            log.warn(e.getMessage());
+            newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+        }
+
+        // Any change in the following parameters implies thread pool restart
+        if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+            setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+            restartMessageHandlerThreadPool();
+        }
+
+        log.info(FORMAT, messageHandlerThreadPoolSize);
+    }
+
     @Override
     public synchronized void removeFlowStatistic(FlowRule rule) {
         ConnectPoint cp = buildConnectPoint(rule);
@@ -134,10 +173,16 @@
         }
 
         // remove this rule if present from current map
-        current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e;  });
+        current.computeIfPresent(cp, (c, e) -> {
+            e.remove(rule);
+            return e;
+        });
 
         // remove this on if present from previous map
-        previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+        previous.computeIfPresent(cp, (c, e) -> {
+            e.remove(rule);
+            return e;
+        });
     }
 
     @Override
@@ -286,4 +331,32 @@
         }
         return null;
     }
+
+    /**
+     * Sets thread pool size of message handler.
+     *
+     * @param poolSize
+     */
+    private void setMessageHandlerThreadPoolSize(int poolSize) {
+        checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+        messageHandlerThreadPoolSize = poolSize;
+    }
+
+    /**
+     * Restarts thread pool of message handler.
+     */
+    private void restartMessageHandlerThreadPool() {
+        ExecutorService prevExecutor = messageHandlingExecutor;
+        messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
+        prevExecutor.shutdown();
+    }
+
+    /**
+     * Gets current thread pool size of message handler.
+     *
+     * @return messageHandlerThreadPoolSize
+     */
+    private int getMessageHandlerThreadPoolSize() {
+        return messageHandlerThreadPoolSize;
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index 35bdad1..a483194 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -20,6 +20,8 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
@@ -39,11 +41,14 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -51,6 +56,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
@@ -67,8 +75,7 @@
 
     private final Logger log = getLogger(getClass());
 
-    // TODO: Make configurable.
-    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MastershipService mastershipService;
@@ -101,13 +108,18 @@
 
     private ExecutorService messageHandlingExecutor;
 
+    private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+            label = "Size of thread pool to assign message handler")
+    private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
     private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
 
     @Activate
     public void activate() {
 
         messageHandlingExecutor = Executors.newFixedThreadPool(
-                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                messageHandlerThreadPoolSize,
                 groupedThreads("onos/store/statistic", "message-handlers"));
 
         clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
@@ -133,6 +145,33 @@
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+        int newMessageHandlerThreadPoolSize;
+
+        try {
+            String s = get(properties, "messageHandlerThreadPoolSize");
+
+            newMessageHandlerThreadPoolSize =
+                    isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException e) {
+            log.warn(e.getMessage());
+            newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+        }
+
+        // Any change in the following parameters implies thread pool restart
+        if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+            setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+            restartMessageHandlerThreadPool();
+        }
+
+        log.info(FORMAT, messageHandlerThreadPoolSize);
+    }
+
+
     @Override
     public void prepareForStatistics(FlowRule rule) {
         ConnectPoint cp = buildConnectPoint(rule);
@@ -314,4 +353,32 @@
 
     }
 
+    /**
+     * Sets thread pool size of message handler.
+     *
+     * @param poolSize
+     */
+    private void setMessageHandlerThreadPoolSize(int poolSize) {
+        checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+        messageHandlerThreadPoolSize = poolSize;
+    }
+
+    /**
+     * Restarts thread pool of message handler.
+     */
+    private void restartMessageHandlerThreadPool() {
+        ExecutorService prevExecutor = messageHandlingExecutor;
+        messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
+        prevExecutor.shutdown();
+    }
+
+    /**
+     * Gets current thread pool size of message handler.
+     *
+     * @return messageHandlerThreadPoolSize
+     */
+    private int getMessageHandlerThreadPoolSize() {
+        return messageHandlerThreadPoolSize;
+    }
+
 }