Improve Executors related logging behavior
- Specify Logger for the Executor
- Use Executor#execute instead of ExecutorService#submit for
fire and forget type of usage.
Note: submit() will swallow thrown Exception
Change-Id: I507b841dc3feedf4ad20a746c304518d68fb846a
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 51f759a..36699b0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -68,6 +68,7 @@
import static com.google.common.collect.Multimaps.newSetMultimap;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.io.ByteStreams.toByteArray;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.randomDelay;
@@ -138,10 +139,10 @@
.register(MultiValuedTimestamp.class)
.register(InternalState.class);
- executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
+ executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
messageHandlingExecutor = Executors.newSingleThreadExecutor(
- groupedThreads("onos/store/app", "message-handler"));
+ groupedThreads("onos/store/app", "message-handler", log));
clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
bytes -> new String(bytes, Charsets.UTF_8),
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index a10b79d..e5f40b3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -96,13 +96,13 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull;
import static com.google.common.base.Verify.verify;
+import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.groupedThreads;
@@ -200,10 +200,10 @@
@Activate
public void activate() {
- executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
+ executor = newCachedThreadPool(groupedThreads("onos/device", "fg-%d", log));
backgroundExecutor =
- newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
+ newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log)));
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index a2b9995..7b94670 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -305,7 +305,7 @@
msgHandlerPoolSize = newPoolSize;
ExecutorService oldMsgHandler = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
+ msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
// replace previously registered handlers.
registerMessageHandlers(messageHandlingExecutor);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index bd5eea5..01f3cb2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -123,10 +123,10 @@
public void activate() {
messageHandlingExecutor =
Executors.newSingleThreadExecutor(
- groupedThreads("onos/store/device/mastership", "message-handler"));
+ groupedThreads("onos/store/device/mastership", "message-handler", log));
transferExecutor =
Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
+ groupedThreads("onos/store/device/mastership", "mastership-transfer-executor", log));
clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
SERIALIZER::decode,
this::relinquishLocalRole,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
index 8e59ef6..c3f907a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
@@ -118,7 +118,7 @@
messageHandlingExecutor = Executors.newFixedThreadPool(
messageHandlerThreadPoolSize,
- groupedThreads("onos/store/statistic", "message-handlers"));
+ groupedThreads("onos/store/statistic", "message-handlers", log));
clusterCommunicator.addSubscriber(
GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
@@ -200,6 +200,7 @@
previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
}
+ @Override
public synchronized void updateFlowStatistic(FlowEntry rule) {
ConnectPoint cp = buildConnectPoint(rule);
if (cp == null) {