Fix more thread leaks
Change-Id: I573c30cfb90de485b5ed0eecb9d79ac567e4acbd
diff --git a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
index d90a938..97ac8cb 100644
--- a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
+++ b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
@@ -47,10 +47,12 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
+import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.d.config.DeviceResourceIds.DCS_NAMESPACE;
import static org.slf4j.LoggerFactory.getLogger;
@@ -67,12 +69,15 @@
private final Logger log = getLogger(getClass());
private final DynamicConfigStoreDelegate storeDelegate = new InternalStoreDelegate();
+ private ExecutorService executor;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DynamicConfigStore store;
private ConcurrentHashMap<String, RpcService> handlerRegistry = new ConcurrentHashMap<>();
@Activate
public void activate() {
+ executor = Executors.newSingleThreadExecutor(groupedThreads("onos/dyncfg", "exec", log));
initStore();
store.setDelegate(storeDelegate);
eventDispatcher.addSink(DynamicConfigEvent.class, listenerRegistry);
@@ -106,6 +111,7 @@
@Deactivate
public void deactivate() {
+ executor.shutdown();
store.unsetDelegate(storeDelegate);
eventDispatcher.removeSink(DynamicConfigEvent.class);
handlerRegistry.clear();
@@ -197,7 +203,7 @@
}
return CompletableFuture.supplyAsync(
new RpcExecutor(handler, getSvcId(handler, ctxt[0]), ctxt[1], RpcMessageId.generate(), input),
- Executors.newSingleThreadExecutor());
+ executor);
}
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index bc7f197..a0e608b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -65,8 +65,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -108,6 +106,8 @@
new PendingHostListener();
private ScheduledExecutorService executor;
+ private ScheduledExecutorService cacheCleaner;
+ private ScheduledExecutorService locationRemover;
private Consumer<Status> statusChangeListener;
@@ -134,8 +134,6 @@
}
}).build();
- private ScheduledExecutorService cacheCleaner = Executors.newSingleThreadScheduledExecutor();
-
@Activate
public void activate() {
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
@@ -165,10 +163,12 @@
.build()
.asAtomicCounter();
- cacheCleaner.scheduleAtFixedRate(pendingHostsCache::cleanUp, 0,
- PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "cache-cleaner", log));
+ cacheCleaner.scheduleAtFixedRate(pendingHostsCache::cleanUp, 0, PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "store", log));
+ locationRemover = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "loc-remover", log));
+
+ executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "status-listener", log));
statusChangeListener = status -> {
if (status == Status.ACTIVE) {
executor.execute(this::loadHostsByIp);
@@ -184,6 +184,8 @@
hostsConsistentMap.removeListener(hostLocationTracker);
cacheCleaner.shutdown();
+ locationRemover.shutdown();
+ executor.shutdown();
log.info("Stopped");
}
@@ -545,7 +547,6 @@
case UPDATE:
// Remove the host location if probe timeout in VERIFY mode
if (newValue.value().expired() && newValue.value().probeMode() == ProbeMode.VERIFY) {
- Executor locationRemover = Executors.newSingleThreadScheduledExecutor();
locationRemover.execute(() -> {
pendingHosts.remove(event.key());
removeLocation(newValue.value().hostId(),