log uncaught Exception
- Use execute instead of submit so that uncaught Exception will
be dealt and logged by ExecutorService's handler.
- Use component's own logger
Change-Id: I761264aea00748980929b5048e111756776dd2f6
diff --git a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
index ae5fb19..2d20e47 100644
--- a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
@@ -57,7 +57,7 @@
private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
private final ExecutorService executor =
- newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d"));
+ newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d", log));
@SuppressWarnings("unchecked")
private static final Event KILL_PILL = new AbstractEvent(null, 0) {
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 0adc1cb..6e0adbc 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -125,7 +125,7 @@
Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d", log));
protected ExecutorService operationsService =
- Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d, log"));
+ Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d", log));
private IdGenerator idGenerator;
@@ -294,7 +294,7 @@
@Override
public void apply(FlowRuleOperations ops) {
checkPermission(FLOWRULE_WRITE);
- operationsService.submit(new FlowOperationsProcessor(ops));
+ operationsService.execute(new FlowOperationsProcessor(ops));
}
@Override
@@ -623,14 +623,14 @@
final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
deviceId, id);
pendingFlowOperations.put(id, this);
- deviceInstallers.submit(() -> store.storeBatch(b));
+ deviceInstallers.execute(() -> store.storeBatch(b));
}
}
public void satisfy(DeviceId devId) {
pendingDevices.remove(devId);
if (pendingDevices.isEmpty()) {
- operationsService.submit(this);
+ operationsService.execute(this);
}
}
@@ -640,7 +640,7 @@
hasFailed.set(true);
pendingDevices.remove(devId);
if (pendingDevices.isEmpty()) {
- operationsService.submit(this);
+ operationsService.execute(this);
}
if (context != null) {
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index cac3717..b6c748a 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -134,7 +134,7 @@
@Activate
protected void activate() {
- executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d"));
+ executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
flowObjectiveStore.setDelegate(delegate);
mastershipService.addListener(mastershipListener);
deviceService.addListener(deviceListener);
@@ -191,7 +191,7 @@
//Attempts to check if pipeliner is null for retry attempts
} else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
Thread.sleep(INSTALL_RETRY_INTERVAL);
- executorService.submit(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
+ executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
} else {
// Otherwise we've tried a few times and failed, report an
// error back to the user.
@@ -208,7 +208,7 @@
@Override
public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
checkPermission(FLOWRULE_WRITE);
- executorService.submit(new ObjectiveInstaller(deviceId, filteringObjective));
+ executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
}
@Override
@@ -217,14 +217,14 @@
if (queueObjective(deviceId, forwardingObjective)) {
return;
}
- executorService.submit(new ObjectiveInstaller(deviceId, forwardingObjective));
+ executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
}
@Override
public void next(DeviceId deviceId, NextObjective nextObjective) {
checkPermission(FLOWRULE_WRITE);
nextToDevice.put(nextObjective.id(), deviceId);
- executorService.submit(new ObjectiveInstaller(deviceId, nextObjective));
+ executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
}
@Override
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java
index 5396e76..00b5d98 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java
@@ -201,7 +201,7 @@
}
} else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
Thread.sleep(INSTALL_RETRY_INTERVAL);
- executorService.submit(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
+ executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
} else {
// Otherwise we've tried a few times and failed, report an
// error back to the user.
@@ -221,7 +221,7 @@
List<FilteringObjective> filteringObjectives
= this.deviceCompositionTreeMap.get(deviceId).updateFilter(filteringObjective);
for (FilteringObjective tmp : filteringObjectives) {
- executorService.submit(new ObjectiveInstaller(deviceId, tmp));
+ executorService.execute(new ObjectiveInstaller(deviceId, tmp));
}
}
@@ -235,7 +235,7 @@
List<ForwardingObjective> forwardingObjectives
= this.deviceCompositionTreeMap.get(deviceId).updateForward(forwardingObjective);
for (ForwardingObjective tmp : forwardingObjectives) {
- executorService.submit(new ObjectiveInstaller(deviceId, tmp));
+ executorService.execute(new ObjectiveInstaller(deviceId, tmp));
}
}
@@ -245,7 +245,7 @@
List<NextObjective> nextObjectives = this.deviceCompositionTreeMap.get(deviceId).updateNext(nextObjective);
for (NextObjective tmp : nextObjectives) {
- executorService.submit(new ObjectiveInstaller(deviceId, tmp));
+ executorService.execute(new ObjectiveInstaller(deviceId, tmp));
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
index 82310fd..a85865e 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
@@ -90,7 +90,7 @@
@Activate
public void activate() {
cfgService.registerProperties(getClass());
- executor = newSingleThreadExecutor(groupedThreads("onos/intent", "cleanup"));
+ executor = newSingleThreadExecutor(groupedThreads("onos/intent", "cleanup", log));
timer = new Timer("onos-intent-cleanup-timer");
service.addListener(this);
adjustRate();
@@ -149,7 +149,7 @@
timerTask = new TimerTask() {
@Override
public void run() {
- executor.submit(IntentCleanup.this);
+ executor.execute(IntentCleanup.this);
}
};
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 45161f2..fc7e568 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -149,8 +149,8 @@
}
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
- batchExecutor = newSingleThreadExecutor(groupedThreads("onos/intent", "batch"));
- workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d"));
+ batchExecutor = newSingleThreadExecutor(groupedThreads("onos/intent", "batch", log));
+ workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d", log));
idGenerator = coreService.getIdGenerator("intent-ids");
Intent.bindIdGenerator(idGenerator);
log.info("Started");
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
index d889847..45a2af4 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
@@ -116,7 +116,7 @@
protected IntentPartitionService partitionService;
private ExecutorService executorService =
- newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker"));
+ newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker", log));
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index b068270..8a74819 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -120,7 +120,7 @@
@Activate
public void activate() {
eventHandlingExecutor = Executors.newSingleThreadExecutor(
- groupedThreads("onos/net/packet", "event-handler"));
+ groupedThreads("onos/net/packet", "event-handler", log));
localNodeId = clusterService.getLocalNode().id();
appId = coreService.getAppId(CoreService.CORE_APP_NAME);
store.setDelegate(delegate);
diff --git a/core/net/src/main/java/org/onosproject/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onosproject/net/topology/impl/DefaultTopologyProvider.java
index 1fac99f..5adec7c 100644
--- a/core/net/src/main/java/org/onosproject/net/topology/impl/DefaultTopologyProvider.java
+++ b/core/net/src/main/java/org/onosproject/net/topology/impl/DefaultTopologyProvider.java
@@ -124,7 +124,7 @@
@Activate
public synchronized void activate(ComponentContext context) {
cfgService.registerProperties(DefaultTopologyProvider.class);
- executor = newFixedThreadPool(MAX_THREADS, groupedThreads("onos/topo", "build-%d"));
+ executor = newFixedThreadPool(MAX_THREADS, groupedThreads("onos/topo", "build-%d", log));
accumulator = new TopologyChangeAccumulator();
logConfig("Configured");
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index ea3e7eb..fb0626c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -107,9 +107,9 @@
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-sender"));
+ groupedThreads("onos/cluster/membership", "heartbeat-sender", log));
private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
+ groupedThreads("onos/cluster/membership", "heartbeat-receiver", log));
private PhiAccrualFailureDetector failureDetector;
@@ -377,7 +377,7 @@
try {
ScheduledExecutorService prevSender = heartBeatSender;
heartBeatSender = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-sender-%d"));
+ groupedThreads("onos/cluster/membership", "heartbeat-sender-%d", log));
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
heartbeatInterval, TimeUnit.MILLISECONDS);
prevSender.shutdown();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index 97a2c71..4d81eb0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -167,7 +167,7 @@
private ScheduledFuture<?> backupTask;
private final ScheduledExecutorService backupSenderExecutor =
- Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
+ Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
@@ -203,7 +203,7 @@
local = clusterService.getLocalNode().id();
messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
+ msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
registerMessageHandlers(messageHandlingExecutor);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index a28e4ee..a142e5f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -185,7 +185,8 @@
messageHandlingExecutor = Executors.
newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/group",
- "message-handlers"));
+ "message-handlers",
+ log));
clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
clusterMsgSerializer::deserialize,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index f3bd0b5..7e63d89 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -118,7 +118,7 @@
public void activate() {
messageHandlingExecutor = Executors.newFixedThreadPool(
messageHandlerThreadPoolSize,
- groupedThreads("onos/store/packet", "message-handlers"));
+ groupedThreads("onos/store/packet", "message-handlers", log));
communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
SERIALIZER::decode,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java b/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
index 0b35e3d..99f46cf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
@@ -76,7 +76,7 @@
private Map<HostId, ArpResponseMessage> pendingMessages = Maps.newConcurrentMap();
private ExecutorService executor =
- newFixedThreadPool(4, groupedThreads("onos/arp", "sender-%d"));
+ newFixedThreadPool(4, groupedThreads("onos/arp", "sender-%d", log));
private NodeId localNodeId;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index b9e240d..d96002e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -120,7 +120,7 @@
messageHandlingExecutor = Executors.newFixedThreadPool(
messageHandlerThreadPoolSize,
- groupedThreads("onos/store/statistic", "message-handlers"));
+ groupedThreads("onos/store/statistic", "message-handlers", log));
clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
SERIALIZER::decode,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index f4bb0d0..2f5bc0a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -218,7 +218,7 @@
} else {
// should be a normal executor; it's used for receiving messages
this.executor =
- Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+ Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d", log));
}
if (communicationExecutor != null) {
@@ -227,7 +227,7 @@
// sending executor; should be capped
//TODO this probably doesn't need to be bounded anymore
this.communicationExecutor =
- newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+ newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d", log));
}
@@ -235,7 +235,7 @@
this.backgroundExecutor = backgroundExecutor;
} else {
this.backgroundExecutor =
- newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+ newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d", log));
}
// start anti-entropy thread
@@ -718,7 +718,7 @@
Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
items.forEach(item -> map.compute(item.key(), (key, existing) ->
item.isNewerThan(existing) ? item : existing));
- communicationExecutor.submit(() -> {
+ communicationExecutor.execute(() -> {
clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
updateMessageSubject,
serializer::encode,