Made time limit for event processing configurable; cleaned-up duplicate code.
Change-Id: I08e7f1c9f4cdbd6404f1eb5e3544989e7a728d92
diff --git a/core/api/src/main/java/org/onosproject/event/EventDeliveryService.java b/core/api/src/main/java/org/onosproject/event/EventDeliveryService.java
index 2d9d38ce..ff26893 100644
--- a/core/api/src/main/java/org/onosproject/event/EventDeliveryService.java
+++ b/core/api/src/main/java/org/onosproject/event/EventDeliveryService.java
@@ -20,4 +20,19 @@
* then dispatching them to the appropriate event sink.
*/
public interface EventDeliveryService extends EventDispatcher, EventSinkRegistry {
+
+ /**
+ * Sets the number of millis that an event sink has to process an event.
+ *
+ * @param millis number of millis allowed per sink per event
+ */
+ void setDispatchTimeLimit(long millis);
+
+ /**
+ * Returns the number of millis that an event sink has to process an event.
+ *
+ * @return number of millis allowed per sink per event
+ */
+ long getDispatchTimeLimit();
+
}
diff --git a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java b/core/common/src/test/java/org/onosproject/common/event/impl/TestEventDispatcher.java
similarity index 87%
rename from core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
rename to core/common/src/test/java/org/onosproject/common/event/impl/TestEventDispatcher.java
index 91a3e53..4ea371a 100644
--- a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
+++ b/core/common/src/test/java/org/onosproject/common/event/impl/TestEventDispatcher.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.event.impl;
+package org.onosproject.common.event.impl;
import org.onosproject.event.DefaultEventSinkRegistry;
import org.onosproject.event.Event;
@@ -37,4 +37,12 @@
sink.process(event);
}
+ @Override
+ public void setDispatchTimeLimit(long millis) {
+ }
+
+ @Override
+ public long getDispatchTimeLimit() {
+ return 0;
+ }
}
diff --git a/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java b/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
index 8e0de97..5c78bb4 100644
--- a/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
+++ b/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
@@ -17,12 +17,12 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
@@ -32,9 +32,11 @@
import org.onosproject.core.IdBlockStore;
import org.onosproject.core.IdGenerator;
import org.onosproject.core.Version;
+import org.onosproject.event.EventDeliveryService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.util.Dictionary;
import java.util.List;
@@ -64,9 +66,18 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
- @Property(name = "sharedThreadPoolSize", intValue = 30,
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDeliveryService;
+
+ private static final int DEFAULT_POOL_SIZE = 30;
+ @Property(name = "sharedThreadPoolSize", intValue = DEFAULT_POOL_SIZE,
label = "Configure shared pool maximum size ")
- private int sharedThreadPoolSize = 30;
+ private int sharedThreadPoolSize = DEFAULT_POOL_SIZE;
+
+ private static final int DEFAULT_EVENT_TIME = 2000;
+ @Property(name = "maxEventTimeLimit", intValue = DEFAULT_EVENT_TIME,
+ label = "Maximum number of millis an event sink has to process an event")
+ private int maxEventTimeLimit = DEFAULT_EVENT_TIME;
@Activate
public void activate() {
@@ -121,30 +132,33 @@
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
- Integer sharedThreadPoolSizeConfig =
- getIntegerProperty(properties, "sharedThreadPoolSize");
- if (sharedThreadPoolSizeConfig == null) {
- log.info("Shared Pool Size is not configured, default value is {}",
- sharedThreadPoolSize);
- } else {
- if (sharedThreadPoolSizeConfig > 0) {
- sharedThreadPoolSize = sharedThreadPoolSizeConfig;
- SharedExecutors.setPoolSize(sharedThreadPoolSize);
- log.info("Configured. Shared Pool Size is configured to {}",
- sharedThreadPoolSize);
- } else {
- log.warn("Shared Pool Size size must be greater than 0");
- }
- }
- }
+ Integer poolSize = getIntegerProperty(properties, "sharedThreadPoolSize");
+ if (poolSize != null && poolSize > 1) {
+ sharedThreadPoolSize = poolSize;
+ SharedExecutors.setPoolSize(sharedThreadPoolSize);
+ } else if (poolSize != null) {
+ log.warn("sharedThreadPoolSize must be greater than 1");
+ }
+
+ Integer timeLimit = getIntegerProperty(properties, "maxEventTimeLimit");
+ if (timeLimit != null && timeLimit > 1) {
+ maxEventTimeLimit = timeLimit;
+ eventDeliveryService.setDispatchTimeLimit(maxEventTimeLimit);
+ } else if (timeLimit != null) {
+ log.warn("maxEventTimeLimit must be greater than 1");
+ }
+
+ log.info("Settings: sharedThreadPoolSize={}, maxEventTimeLimit={}",
+ sharedThreadPoolSize, maxEventTimeLimit);
+ }
/**
* Get Integer property from the propertyName
* Return null if propertyName is not found.
*
- * @param properties properties to be looked up
+ * @param properties properties to be looked up
* @param propertyName the name of the property to look up
* @return value when the propertyName is defined or return null
*/
diff --git a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
index 382846c..6d97460 100644
--- a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
@@ -33,6 +33,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -45,12 +46,12 @@
public class CoreEventDispatcher extends DefaultEventSinkRegistry
implements EventDeliveryService {
- // Maximum number of millis a sink can take to process an event.
- private static final long MAX_EXECUTE_MS = 1_000;
- private static final long WATCHDOG_MS = MAX_EXECUTE_MS / 4;
-
private final Logger log = getLogger(getClass());
+ // Default number of millis a sink can take to process an event.
+ private static final long DEFAULT_EXECUTE_MS = 2_000; // ms
+ private static final long WATCHDOG_MS = 250; // ms
+
private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
private final ExecutorService executor =
@@ -61,6 +62,7 @@
};
private DispatchLoop dispatchLoop;
+ private long maxProcessMillis = DEFAULT_EXECUTE_MS;
// Means to detect long-running sinks
private TimerTask watchdog;
@@ -92,6 +94,18 @@
log.info("Stopped");
}
+ @Override
+ public void setDispatchTimeLimit(long millis) {
+ checkArgument(millis >= WATCHDOG_MS,
+ "Time limit must be greater than %s", WATCHDOG_MS);
+ maxProcessMillis = millis;
+ }
+
+ @Override
+ public long getDispatchTimeLimit() {
+ return maxProcessMillis;
+ }
+
// Auxiliary event dispatching loop that feeds off the events queue.
private class DispatchLoop implements Runnable {
private volatile boolean stopped;
@@ -126,7 +140,7 @@
lastStart = 0;
} else {
log.warn("No sink registered for event class {}",
- event.getClass());
+ event.getClass().getName());
}
}
@@ -140,7 +154,7 @@
@Override
public void run() {
long delta = System.currentTimeMillis() - lastStart;
- if (lastStart > 0 && delta > MAX_EXECUTE_MS) {
+ if (lastStart > 0 && delta > maxProcessMillis) {
log.error("Event sink {} exceeded execution time limit: {} ms",
lastSink.getClass().getName(), delta);
diff --git a/core/net/src/test/java/org/onosproject/app/impl/ApplicationManagerTest.java b/core/net/src/test/java/org/onosproject/app/impl/ApplicationManagerTest.java
index 0be1ea8..4b4bbee 100644
--- a/core/net/src/test/java/org/onosproject/app/impl/ApplicationManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/app/impl/ApplicationManagerTest.java
@@ -28,7 +28,7 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.DefaultApplication;
import org.onosproject.core.DefaultApplicationId;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import java.io.InputStream;
import java.net.URI;
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
index b5ae31b..e4d4331 100644
--- a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
@@ -26,7 +26,7 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipStore;
import org.onosproject.mastership.MastershipTermService;
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index d6bfda2..0b6c3e0 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -31,7 +31,7 @@
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Event;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.mastership.MastershipTermService;
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index c8a32f0..1113b2b 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -29,7 +29,7 @@
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.IdGenerator;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.Device;
import org.onosproject.net.Device.Type;
diff --git a/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java b/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
index 6b5929a..43ce78a 100644
--- a/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
@@ -35,7 +35,7 @@
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.impl.DeviceManager;
diff --git a/core/net/src/test/java/org/onosproject/net/host/impl/HostManagerTest.java b/core/net/src/test/java/org/onosproject/net/host/impl/HostManagerTest.java
index e28f193..a6e8e62 100644
--- a/core/net/src/test/java/org/onosproject/net/host/impl/HostManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/host/impl/HostManagerTest.java
@@ -36,7 +36,7 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onosproject.event.Event;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
index 66bf476..d48d107 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
@@ -30,7 +30,7 @@
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.impl.TestCoreManager;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.NetworkResource;
import org.onosproject.net.intent.FlowRuleIntent;
import org.onosproject.net.intent.Intent;
diff --git a/core/net/src/test/java/org/onosproject/net/link/impl/LinkManagerTest.java b/core/net/src/test/java/org/onosproject/net/link/impl/LinkManagerTest.java
index ac6d1eb..cacf082 100644
--- a/core/net/src/test/java/org/onosproject/net/link/impl/LinkManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/link/impl/LinkManagerTest.java
@@ -37,7 +37,7 @@
import org.onosproject.net.link.LinkService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.device.impl.DeviceManager;
import org.onosproject.store.trivial.impl.SimpleLinkStore;
diff --git a/core/net/src/test/java/org/onosproject/net/topology/impl/DefaultTopologyProviderTest.java b/core/net/src/test/java/org/onosproject/net/topology/impl/DefaultTopologyProviderTest.java
index 91d0531..a54d075 100644
--- a/core/net/src/test/java/org/onosproject/net/topology/impl/DefaultTopologyProviderTest.java
+++ b/core/net/src/test/java/org/onosproject/net/topology/impl/DefaultTopologyProviderTest.java
@@ -22,7 +22,7 @@
import org.junit.Test;
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.event.Event;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.Device;
import org.onosproject.net.Link;
import org.onosproject.net.device.DeviceEvent;
diff --git a/core/net/src/test/java/org/onosproject/net/topology/impl/TopologyManagerTest.java b/core/net/src/test/java/org/onosproject/net/topology/impl/TopologyManagerTest.java
index ddbecfa..d3bbd2b 100644
--- a/core/net/src/test/java/org/onosproject/net/topology/impl/TopologyManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/topology/impl/TopologyManagerTest.java
@@ -19,7 +19,7 @@
import org.junit.Before;
import org.junit.Test;
import org.onosproject.event.Event;
-import org.onosproject.event.impl.TestEventDispatcher;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.Link;
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index e9fa53e..d429752 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -22,10 +22,7 @@
import org.junit.Test;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
-import org.onosproject.event.DefaultEventSinkRegistry;
-import org.onosproject.event.Event;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.event.EventSink;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipEvent.Type;
@@ -44,7 +41,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import static com.google.common.base.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -168,21 +164,4 @@
}
}
-
- // code clone
- /**
- * Implements event delivery system that delivers events synchronously, or
- * in-line with the post method invocation.
- */
- private static class TestEventDispatcher extends DefaultEventSinkRegistry
- implements EventDeliveryService {
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void post(Event event) {
- EventSink sink = getSink(event.getClass());
- checkState(sink != null, "No sink for event %s", event);
- sink.process(event);
- }
- }
}