Made time limit for event processing configurable; cleaned-up duplicate code.

Change-Id: I08e7f1c9f4cdbd6404f1eb5e3544989e7a728d92
diff --git a/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java b/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
index 8e0de97..5c78bb4 100644
--- a/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
+++ b/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
@@ -17,12 +17,12 @@
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
 import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
@@ -32,9 +32,11 @@
 import org.onosproject.core.IdBlockStore;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.core.Version;
+import org.onosproject.event.EventDeliveryService;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.util.Dictionary;
 import java.util.List;
@@ -64,9 +66,18 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
-    @Property(name = "sharedThreadPoolSize", intValue = 30,
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDeliveryService;
+
+    private static final int DEFAULT_POOL_SIZE = 30;
+    @Property(name = "sharedThreadPoolSize", intValue = DEFAULT_POOL_SIZE,
             label = "Configure shared pool maximum size ")
-    private int sharedThreadPoolSize = 30;
+    private int sharedThreadPoolSize = DEFAULT_POOL_SIZE;
+
+    private static final int DEFAULT_EVENT_TIME = 2000;
+    @Property(name = "maxEventTimeLimit", intValue = DEFAULT_EVENT_TIME,
+            label = "Maximum number of millis an event sink has to process an event")
+    private int maxEventTimeLimit = DEFAULT_EVENT_TIME;
 
     @Activate
     public void activate() {
@@ -121,30 +132,33 @@
     @Modified
     public void modified(ComponentContext context) {
         Dictionary<?, ?> properties = context.getProperties();
-        Integer sharedThreadPoolSizeConfig =
-                getIntegerProperty(properties, "sharedThreadPoolSize");
-        if (sharedThreadPoolSizeConfig == null) {
-            log.info("Shared Pool Size is not configured, default value is {}",
-                    sharedThreadPoolSize);
-        } else {
-            if (sharedThreadPoolSizeConfig > 0) {
-                sharedThreadPoolSize = sharedThreadPoolSizeConfig;
-                SharedExecutors.setPoolSize(sharedThreadPoolSize);
-                log.info("Configured. Shared Pool Size is configured to {}",
-                        sharedThreadPoolSize);
-            } else {
-                log.warn("Shared Pool Size size must be greater than 0");
-            }
-        }
-    }
+        Integer poolSize = getIntegerProperty(properties, "sharedThreadPoolSize");
 
+        if (poolSize != null && poolSize > 1) {
+            sharedThreadPoolSize = poolSize;
+            SharedExecutors.setPoolSize(sharedThreadPoolSize);
+        } else if (poolSize != null) {
+            log.warn("sharedThreadPoolSize must be greater than 1");
+        }
+
+        Integer timeLimit = getIntegerProperty(properties, "maxEventTimeLimit");
+        if (timeLimit != null && timeLimit > 1) {
+            maxEventTimeLimit = timeLimit;
+            eventDeliveryService.setDispatchTimeLimit(maxEventTimeLimit);
+        } else if (timeLimit != null) {
+            log.warn("maxEventTimeLimit must be greater than 1");
+        }
+
+        log.info("Settings: sharedThreadPoolSize={}, maxEventTimeLimit={}",
+                 sharedThreadPoolSize, maxEventTimeLimit);
+    }
 
 
     /**
      * Get Integer property from the propertyName
      * Return null if propertyName is not found.
      *
-     * @param properties properties to be looked up
+     * @param properties   properties to be looked up
      * @param propertyName the name of the property to look up
      * @return value when the propertyName is defined or return null
      */
diff --git a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
index 382846c..6d97460 100644
--- a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -45,12 +46,12 @@
 public class CoreEventDispatcher extends DefaultEventSinkRegistry
         implements EventDeliveryService {
 
-    // Maximum number of millis a sink can take to process an event.
-    private static final long MAX_EXECUTE_MS = 1_000;
-    private static final long WATCHDOG_MS = MAX_EXECUTE_MS / 4;
-
     private final Logger log = getLogger(getClass());
 
+    // Default number of millis a sink can take to process an event.
+    private static final long DEFAULT_EXECUTE_MS = 2_000; // ms
+    private static final long WATCHDOG_MS = 250; // ms
+
     private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
 
     private final ExecutorService executor =
@@ -61,6 +62,7 @@
     };
 
     private DispatchLoop dispatchLoop;
+    private long maxProcessMillis = DEFAULT_EXECUTE_MS;
 
     // Means to detect long-running sinks
     private TimerTask watchdog;
@@ -92,6 +94,18 @@
         log.info("Stopped");
     }
 
+    @Override
+    public void setDispatchTimeLimit(long millis) {
+        checkArgument(millis >= WATCHDOG_MS,
+                      "Time limit must be greater than %s", WATCHDOG_MS);
+        maxProcessMillis = millis;
+    }
+
+    @Override
+    public long getDispatchTimeLimit() {
+        return maxProcessMillis;
+    }
+
     // Auxiliary event dispatching loop that feeds off the events queue.
     private class DispatchLoop implements Runnable {
         private volatile boolean stopped;
@@ -126,7 +140,7 @@
                 lastStart = 0;
             } else {
                 log.warn("No sink registered for event class {}",
-                         event.getClass());
+                         event.getClass().getName());
             }
         }
 
@@ -140,7 +154,7 @@
         @Override
         public void run() {
             long delta = System.currentTimeMillis() - lastStart;
-            if (lastStart > 0 && delta > MAX_EXECUTE_MS) {
+            if (lastStart > 0 && delta > maxProcessMillis) {
                 log.error("Event sink {} exceeded execution time limit: {} ms",
                           lastSink.getClass().getName(), delta);