Add SharedScheduledExecutors for ease of using ScheduledExecutors
- LogScheduledExecutorService -> SharedScheduledExecutorService
- Add a utility classs for SharedScheduledExecutorService
- Add unit test for SharedScheduledExecutors
- Revise the control message provider to use
SharedScheduledExecutorService
Change-Id: Ia4dea245543b4751e6edcce1aaab4991d897cc77
diff --git a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
index a8c2ef4..10b10ba 100644
--- a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
+++ b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
@@ -22,6 +22,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.metrics.MetricsService;
+import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.cpman.message.ControlMessageProvider;
import org.onosproject.cpman.message.ControlMessageProviderRegistry;
import org.onosproject.cpman.message.ControlMessageProviderService;
@@ -43,9 +44,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.loggableScheduledExecutor;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.openflow.controller.Dpid.uri;
import static org.slf4j.LoggerFactory.getLogger;
@@ -106,9 +104,7 @@
// listens all OpenFlow outgoing message events
controller.getSwitches().forEach(sw -> sw.addEventListener(outMsgListener));
- executor = loggableScheduledExecutor(
- newSingleThreadScheduledExecutor(groupedThreads("onos/provider",
- "aggregator")));
+ executor = SharedScheduledExecutors.getSingleThreadExecutor();
connectInitialDevices();
log.info("Started");
diff --git a/utils/misc/src/main/java/org/onlab/util/LogScheduledExecutorService.java b/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutorService.java
similarity index 96%
rename from utils/misc/src/main/java/org/onlab/util/LogScheduledExecutorService.java
rename to utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutorService.java
index 257f98e..e2b9b6f 100644
--- a/utils/misc/src/main/java/org/onlab/util/LogScheduledExecutorService.java
+++ b/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutorService.java
@@ -16,6 +16,7 @@
package org.onlab.util;
+import com.google.common.base.Throwables;
import org.slf4j.Logger;
import java.util.Collection;
@@ -33,7 +34,7 @@
/**
* A new scheduled executor service that does not eat exception.
*/
-class LogScheduledExecutorService implements ScheduledExecutorService {
+class SharedScheduledExecutorService implements ScheduledExecutorService {
private static final String NOT_ALLOWED = "Shutdown of scheduled executor is not allowed";
private final Logger log = getLogger(getClass());
@@ -45,7 +46,7 @@
*
* @param executor executor service to wrap
*/
- LogScheduledExecutorService(ScheduledExecutorService executor) {
+ SharedScheduledExecutorService(ScheduledExecutorService executor) {
this.executor = executor;
}
@@ -191,7 +192,7 @@
runnable.run();
} catch (Exception e) {
log.error("Uncaught exception on " + runnable.getClass().getSimpleName(), e);
- throw new RuntimeException(e);
+ throw Throwables.propagate(e);
}
}
}
diff --git a/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutors.java b/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutors.java
new file mode 100644
index 0000000..5a2f85c
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutors.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
+/**
+ * Utility for managing a set of shared execution resources, such as a single
+ * thread scheduled executor and thread pool scheduled executor for use by
+ * various parts of the platform or by applications.
+ * <p>
+ * Whenever possible, use of these shared resources is encouraged over creating
+ * separate ones.
+ * </p>
+ */
+public final class SharedScheduledExecutors {
+
+ public static final int DEFAULT_POOL_SIZE = 30;
+
+ private static SharedScheduledExecutorService singleThreadExecutor =
+ new SharedScheduledExecutorService(
+ newSingleThreadScheduledExecutor(
+ groupedThreads("onos/shared/scheduled",
+ "onos-single-executor")));
+
+ private static SharedScheduledExecutorService poolThreadExecutor =
+ new SharedScheduledExecutorService(
+ newScheduledThreadPool(DEFAULT_POOL_SIZE,
+ groupedThreads("onos/shared/scheduled",
+ "onos-pool-executor-%d")));
+
+ // Ban public construction
+ private SharedScheduledExecutors() {
+ }
+
+ /**
+ * Returns the shared scheduled single thread executor.
+ *
+ * @return shared scheduled single thread executor
+ */
+ public static ScheduledExecutorService getSingleThreadExecutor() {
+ return singleThreadExecutor;
+ }
+
+ /**
+ * Returns the shared scheduled thread pool executor.
+ *
+ * @return shared scheduled executor pool
+ */
+ public static ScheduledExecutorService getPoolThreadExecutor() {
+ return poolThreadExecutor;
+ }
+
+ /**
+ * Configures the shared scheduled thread pool size.
+ *
+ * @param poolSize new pool size
+ */
+ public static void setPoolSize(int poolSize) {
+ checkArgument(poolSize > 0, "Shared pool size size must be greater than 0");
+ poolThreadExecutor.setBackingExecutor(
+ newScheduledThreadPool(poolSize, groupedThreads("onos/shared/scheduled",
+ "onos-pool-executor-%d")));
+ }
+
+ /**
+ * Shuts down all shared scheduled executors.
+ * This is not intended to be called by application directly.
+ */
+ public static void shutdown() {
+ singleThreadExecutor.backingExecutor().shutdown();
+ poolThreadExecutor.backingExecutor().shutdown();
+ }
+}
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index e9c9bc3..e13fb2b 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -15,10 +15,11 @@
*/
package org.onlab.util;
-import static java.nio.file.Files.delete;
-import static java.nio.file.Files.walkFileTree;
-import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
-import static org.slf4j.LoggerFactory.getLogger;
+import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
+import com.google.common.primitives.UnsignedLongs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
import java.io.BufferedReader;
import java.io.File;
@@ -45,7 +46,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -55,12 +55,10 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import org.slf4j.Logger;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Strings;
-import com.google.common.primitives.UnsignedLongs;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static java.nio.file.Files.delete;
+import static java.nio.file.Files.walkFileTree;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Miscellaneous utility methods.
@@ -130,17 +128,6 @@
}
/**
- * Returns a loggable scheduled executor service that allows to capture and
- * log any exceptions if the scheduled tasks are failed during execution.
- *
- * @param executor scheduled executor service
- * @return loggable scheduled executor service
- */
- public static ScheduledExecutorService loggableScheduledExecutor(ScheduledExecutorService executor) {
- return new LogScheduledExecutorService(executor);
- }
-
- /**
* Returns a thread factory that produces threads with MIN_PRIORITY.
*
* @param factory backing ThreadFactory
diff --git a/utils/misc/src/test/java/org/onlab/util/SharedScheduledExecutorsTest.java b/utils/misc/src/test/java/org/onlab/util/SharedScheduledExecutorsTest.java
new file mode 100644
index 0000000..7989aeb
--- /dev/null
+++ b/utils/misc/src/test/java/org/onlab/util/SharedScheduledExecutorsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests of the SharedScheduledExecutors.
+ */
+public class SharedScheduledExecutorsTest {
+
+ @Test
+ public void singleThread() {
+ ScheduledExecutorService a = SharedScheduledExecutors.getSingleThreadExecutor();
+ assertNotNull("ScheduledExecutorService must not be null", a);
+ ScheduledExecutorService b = SharedScheduledExecutors.getSingleThreadExecutor();
+ assertSame("factories should be same", a, b);
+ }
+
+ @Test
+ public void poolThread() {
+ ScheduledExecutorService a = SharedScheduledExecutors.getPoolThreadExecutor();
+ assertNotNull("ScheduledExecutorService must not be null", a);
+ ScheduledExecutorService b = SharedScheduledExecutors.getPoolThreadExecutor();
+ assertSame("factories should be same", a, b);
+ }
+}