[ONOS-4004] DistributedStatisticStore and DistributedFlowStatisticStore make configurable
- Using @Property and @Modified annotations
- Fix DistributedPacketStore / DistributedStatisticStore / DistributedFlowStatisticStore

Change-Id: I6c907498496b9f21a8ef13b7badeb24770cb88ff
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index bab5cdf..630de79 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -21,6 +21,8 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
@@ -44,14 +46,20 @@
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import java.util.Dictionary;
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onlab.util.Tools.retryable;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -68,10 +76,7 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final int MAX_BACKOFF = 50;
-
-    // TODO: make this configurable.
-    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MastershipService mastershipService;
@@ -102,10 +107,17 @@
 
     private ExecutorService messageHandlingExecutor;
 
+    private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+    @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
+            label = "Size of thread pool to assign message handler")
+    private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
+
+    private static final int MAX_BACKOFF = 50;
+
     @Activate
     public void activate() {
         messageHandlingExecutor = Executors.newFixedThreadPool(
-                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                messageHandlerThreadPoolSize,
                 groupedThreads("onos/store/packet", "message-handlers"));
 
         communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
@@ -126,6 +138,33 @@
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+        int newMessageHandlerThreadPoolSize;
+
+        try {
+            String s = get(properties, "messageHandlerThreadPoolSize");
+
+            newMessageHandlerThreadPoolSize =
+                    isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException e) {
+            log.warn(e.getMessage());
+            newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
+        }
+
+        // Any change in the following parameters implies thread pool restart
+        if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
+            setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
+            restartMessageHandlerThreadPool();
+        }
+
+        log.info(FORMAT, messageHandlerThreadPoolSize);
+    }
+
+
     @Override
     public void emit(OutboundPacket packet) {
         NodeId myId = clusterService.getLocalNode().id();
@@ -239,4 +278,32 @@
             return list;
         }
     }
+
+    /**
+     * Sets thread pool size of message handler.
+     *
+     * @param poolSize
+     */
+    private void setMessageHandlerThreadPoolSize(int poolSize) {
+        checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
+        messageHandlerThreadPoolSize = poolSize;
+    }
+
+    /**
+     * Restarts thread pool of message handler.
+     */
+    private void restartMessageHandlerThreadPool() {
+        ExecutorService prevExecutor = messageHandlingExecutor;
+        messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
+        prevExecutor.shutdown();
+    }
+
+    /**
+     * Gets current thread pool size of message handler.
+     *
+     * @return messageHandlerThreadPoolSize
+     */
+    private int getMessageHandlerThreadPoolSize() {
+        return messageHandlerThreadPoolSize;
+    }
 }