Made time limit for event processing configurable; cleaned-up duplicate code.

Change-Id: I08e7f1c9f4cdbd6404f1eb5e3544989e7a728d92
diff --git a/core/api/src/main/java/org/onosproject/event/EventDeliveryService.java b/core/api/src/main/java/org/onosproject/event/EventDeliveryService.java
index 2d9d38ce..ff26893 100644
--- a/core/api/src/main/java/org/onosproject/event/EventDeliveryService.java
+++ b/core/api/src/main/java/org/onosproject/event/EventDeliveryService.java
@@ -20,4 +20,19 @@
  * then dispatching them to the appropriate event sink.
  */
 public interface EventDeliveryService extends EventDispatcher, EventSinkRegistry {
+
+    /**
+     * Sets the number of millis that an event sink has to process an event.
+     *
+     * @param millis number of millis allowed per sink per event
+     */
+    void setDispatchTimeLimit(long millis);
+
+    /**
+     * Returns the number of millis that an event sink has to process an event.
+     *
+     * @return number of millis allowed per sink per event
+     */
+    long getDispatchTimeLimit();
+
 }
diff --git a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java b/core/common/src/test/java/org/onosproject/common/event/impl/TestEventDispatcher.java
similarity index 87%
rename from core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
rename to core/common/src/test/java/org/onosproject/common/event/impl/TestEventDispatcher.java
index 91a3e53..4ea371a 100644
--- a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
+++ b/core/common/src/test/java/org/onosproject/common/event/impl/TestEventDispatcher.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.event.impl;
+package org.onosproject.common.event.impl;
 
 import org.onosproject.event.DefaultEventSinkRegistry;
 import org.onosproject.event.Event;
@@ -37,4 +37,12 @@
         sink.process(event);
     }
 
+    @Override
+    public void setDispatchTimeLimit(long millis) {
+    }
+
+    @Override
+    public long getDispatchTimeLimit() {
+        return 0;
+    }
 }
diff --git a/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java b/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
index 8e0de97..5c78bb4 100644
--- a/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
+++ b/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
@@ -17,12 +17,12 @@
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
 import org.onlab.util.SharedExecutors;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
@@ -32,9 +32,11 @@
 import org.onosproject.core.IdBlockStore;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.core.Version;
+import org.onosproject.event.EventDeliveryService;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.util.Dictionary;
 import java.util.List;
@@ -64,9 +66,18 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
-    @Property(name = "sharedThreadPoolSize", intValue = 30,
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDeliveryService;
+
+    private static final int DEFAULT_POOL_SIZE = 30;
+    @Property(name = "sharedThreadPoolSize", intValue = DEFAULT_POOL_SIZE,
             label = "Configure shared pool maximum size ")
-    private int sharedThreadPoolSize = 30;
+    private int sharedThreadPoolSize = DEFAULT_POOL_SIZE;
+
+    private static final int DEFAULT_EVENT_TIME = 2000;
+    @Property(name = "maxEventTimeLimit", intValue = DEFAULT_EVENT_TIME,
+            label = "Maximum number of millis an event sink has to process an event")
+    private int maxEventTimeLimit = DEFAULT_EVENT_TIME;
 
     @Activate
     public void activate() {
@@ -121,30 +132,33 @@
     @Modified
     public void modified(ComponentContext context) {
         Dictionary<?, ?> properties = context.getProperties();
-        Integer sharedThreadPoolSizeConfig =
-                getIntegerProperty(properties, "sharedThreadPoolSize");
-        if (sharedThreadPoolSizeConfig == null) {
-            log.info("Shared Pool Size is not configured, default value is {}",
-                    sharedThreadPoolSize);
-        } else {
-            if (sharedThreadPoolSizeConfig > 0) {
-                sharedThreadPoolSize = sharedThreadPoolSizeConfig;
-                SharedExecutors.setPoolSize(sharedThreadPoolSize);
-                log.info("Configured. Shared Pool Size is configured to {}",
-                        sharedThreadPoolSize);
-            } else {
-                log.warn("Shared Pool Size size must be greater than 0");
-            }
-        }
-    }
+        Integer poolSize = getIntegerProperty(properties, "sharedThreadPoolSize");
 
+        if (poolSize != null && poolSize > 1) {
+            sharedThreadPoolSize = poolSize;
+            SharedExecutors.setPoolSize(sharedThreadPoolSize);
+        } else if (poolSize != null) {
+            log.warn("sharedThreadPoolSize must be greater than 1");
+        }
+
+        Integer timeLimit = getIntegerProperty(properties, "maxEventTimeLimit");
+        if (timeLimit != null && timeLimit > 1) {
+            maxEventTimeLimit = timeLimit;
+            eventDeliveryService.setDispatchTimeLimit(maxEventTimeLimit);
+        } else if (timeLimit != null) {
+            log.warn("maxEventTimeLimit must be greater than 1");
+        }
+
+        log.info("Settings: sharedThreadPoolSize={}, maxEventTimeLimit={}",
+                 sharedThreadPoolSize, maxEventTimeLimit);
+    }
 
 
     /**
      * Get Integer property from the propertyName
      * Return null if propertyName is not found.
      *
-     * @param properties properties to be looked up
+     * @param properties   properties to be looked up
      * @param propertyName the name of the property to look up
      * @return value when the propertyName is defined or return null
      */
diff --git a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
index 382846c..6d97460 100644
--- a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -45,12 +46,12 @@
 public class CoreEventDispatcher extends DefaultEventSinkRegistry
         implements EventDeliveryService {
 
-    // Maximum number of millis a sink can take to process an event.
-    private static final long MAX_EXECUTE_MS = 1_000;
-    private static final long WATCHDOG_MS = MAX_EXECUTE_MS / 4;
-
     private final Logger log = getLogger(getClass());
 
+    // Default number of millis a sink can take to process an event.
+    private static final long DEFAULT_EXECUTE_MS = 2_000; // ms
+    private static final long WATCHDOG_MS = 250; // ms
+
     private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
 
     private final ExecutorService executor =
@@ -61,6 +62,7 @@
     };
 
     private DispatchLoop dispatchLoop;
+    private long maxProcessMillis = DEFAULT_EXECUTE_MS;
 
     // Means to detect long-running sinks
     private TimerTask watchdog;
@@ -92,6 +94,18 @@
         log.info("Stopped");
     }
 
+    @Override
+    public void setDispatchTimeLimit(long millis) {
+        checkArgument(millis >= WATCHDOG_MS,
+                      "Time limit must be greater than %s", WATCHDOG_MS);
+        maxProcessMillis = millis;
+    }
+
+    @Override
+    public long getDispatchTimeLimit() {
+        return maxProcessMillis;
+    }
+
     // Auxiliary event dispatching loop that feeds off the events queue.
     private class DispatchLoop implements Runnable {
         private volatile boolean stopped;
@@ -126,7 +140,7 @@
                 lastStart = 0;
             } else {
                 log.warn("No sink registered for event class {}",
-                         event.getClass());
+                         event.getClass().getName());
             }
         }
 
@@ -140,7 +154,7 @@
         @Override
         public void run() {
             long delta = System.currentTimeMillis() - lastStart;
-            if (lastStart > 0 && delta > MAX_EXECUTE_MS) {
+            if (lastStart > 0 && delta > maxProcessMillis) {
                 log.error("Event sink {} exceeded execution time limit: {} ms",
                           lastSink.getClass().getName(), delta);
 
diff --git a/core/net/src/test/java/org/onosproject/app/impl/ApplicationManagerTest.java b/core/net/src/test/java/org/onosproject/app/impl/ApplicationManagerTest.java
index 0be1ea8..4b4bbee 100644
--- a/core/net/src/test/java/org/onosproject/app/impl/ApplicationManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/app/impl/ApplicationManagerTest.java
@@ -28,7 +28,7 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.DefaultApplication;
 import org.onosproject.core.DefaultApplicationId;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 
 import java.io.InputStream;
 import java.net.URI;
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
index b5ae31b..e4d4331 100644
--- a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
@@ -26,7 +26,7 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipStore;
 import org.onosproject.mastership.MastershipTermService;
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index d6bfda2..0b6c3e0 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -31,7 +31,7 @@
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Event;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.mastership.MastershipTermService;
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index c8a32f0..1113b2b 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -29,7 +29,7 @@
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.core.IdGenerator;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.DefaultDevice;
 import org.onosproject.net.Device;
 import org.onosproject.net.Device.Type;
diff --git a/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java b/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
index 6b5929a..43ce78a 100644
--- a/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
@@ -35,7 +35,7 @@
 import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.core.DefaultGroupId;
 import org.onosproject.core.GroupId;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.impl.DeviceManager;
diff --git a/core/net/src/test/java/org/onosproject/net/host/impl/HostManagerTest.java b/core/net/src/test/java/org/onosproject/net/host/impl/HostManagerTest.java
index e28f193..a6e8e62 100644
--- a/core/net/src/test/java/org/onosproject/net/host/impl/HostManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/host/impl/HostManagerTest.java
@@ -36,7 +36,7 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onosproject.event.Event;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
index 66bf476..d48d107 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
@@ -30,7 +30,7 @@
 import org.onosproject.cfg.ComponentConfigAdapter;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.impl.TestCoreManager;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.NetworkResource;
 import org.onosproject.net.intent.FlowRuleIntent;
 import org.onosproject.net.intent.Intent;
diff --git a/core/net/src/test/java/org/onosproject/net/link/impl/LinkManagerTest.java b/core/net/src/test/java/org/onosproject/net/link/impl/LinkManagerTest.java
index ac6d1eb..cacf082 100644
--- a/core/net/src/test/java/org/onosproject/net/link/impl/LinkManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/link/impl/LinkManagerTest.java
@@ -37,7 +37,7 @@
 import org.onosproject.net.link.LinkService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.device.impl.DeviceManager;
 import org.onosproject.store.trivial.impl.SimpleLinkStore;
 
diff --git a/core/net/src/test/java/org/onosproject/net/topology/impl/DefaultTopologyProviderTest.java b/core/net/src/test/java/org/onosproject/net/topology/impl/DefaultTopologyProviderTest.java
index 91d0531..a54d075 100644
--- a/core/net/src/test/java/org/onosproject/net/topology/impl/DefaultTopologyProviderTest.java
+++ b/core/net/src/test/java/org/onosproject/net/topology/impl/DefaultTopologyProviderTest.java
@@ -22,7 +22,7 @@
 import org.junit.Test;
 import org.onosproject.cfg.ComponentConfigAdapter;
 import org.onosproject.event.Event;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.Device;
 import org.onosproject.net.Link;
 import org.onosproject.net.device.DeviceEvent;
diff --git a/core/net/src/test/java/org/onosproject/net/topology/impl/TopologyManagerTest.java b/core/net/src/test/java/org/onosproject/net/topology/impl/TopologyManagerTest.java
index ddbecfa..d3bbd2b 100644
--- a/core/net/src/test/java/org/onosproject/net/topology/impl/TopologyManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/topology/impl/TopologyManagerTest.java
@@ -19,7 +19,7 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.onosproject.event.Event;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
 import org.onosproject.net.Link;
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index e9fa53e..d429752 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -22,10 +22,7 @@
 import org.junit.Test;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.RoleInfo;
-import org.onosproject.event.DefaultEventSinkRegistry;
-import org.onosproject.event.Event;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.event.EventSink;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.event.ListenerRegistry;
 import org.onosproject.mastership.MastershipEvent;
 import org.onosproject.mastership.MastershipEvent.Type;
@@ -44,7 +41,6 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -168,21 +164,4 @@
         }
     }
 
-
-    // code clone
-    /**
-     * Implements event delivery system that delivers events synchronously, or
-     * in-line with the post method invocation.
-     */
-    private static class TestEventDispatcher extends DefaultEventSinkRegistry
-            implements EventDeliveryService {
-
-        @SuppressWarnings({ "rawtypes", "unchecked" })
-        @Override
-        public void post(Event event) {
-            EventSink sink = getSink(event.getClass());
-            checkState(sink != null, "No sink for event %s", event);
-            sink.process(event);
-        }
-    }
 }