Fix more thread leaks

Change-Id: I573c30cfb90de485b5ed0eecb9d79ac567e4acbd
diff --git a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
index d90a938..97ac8cb 100644
--- a/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
+++ b/apps/config/src/main/java/org/onosproject/config/impl/DynamicConfigManager.java
@@ -47,10 +47,12 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.slf4j.Logger;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.d.config.DeviceResourceIds.DCS_NAMESPACE;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -67,12 +69,15 @@
     private final Logger log = getLogger(getClass());
     private final DynamicConfigStoreDelegate storeDelegate = new InternalStoreDelegate();
 
+    private ExecutorService executor;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DynamicConfigStore store;
     private ConcurrentHashMap<String, RpcService> handlerRegistry = new ConcurrentHashMap<>();
 
     @Activate
     public void activate() {
+        executor = Executors.newSingleThreadExecutor(groupedThreads("onos/dyncfg", "exec", log));
         initStore();
         store.setDelegate(storeDelegate);
         eventDispatcher.addSink(DynamicConfigEvent.class, listenerRegistry);
@@ -106,6 +111,7 @@
 
     @Deactivate
     public void deactivate() {
+        executor.shutdown();
         store.unsetDelegate(storeDelegate);
         eventDispatcher.removeSink(DynamicConfigEvent.class);
         handlerRegistry.clear();
@@ -197,7 +203,7 @@
         }
         return CompletableFuture.supplyAsync(
                 new RpcExecutor(handler, getSvcId(handler, ctxt[0]), ctxt[1], RpcMessageId.generate(), input),
-                Executors.newSingleThreadExecutor());
+                executor);
     }
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index bc7f197..a0e608b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -65,8 +65,6 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -108,6 +106,8 @@
             new PendingHostListener();
 
     private ScheduledExecutorService executor;
+    private ScheduledExecutorService cacheCleaner;
+    private ScheduledExecutorService locationRemover;
 
     private Consumer<Status> statusChangeListener;
 
@@ -134,8 +134,6 @@
                 }
             }).build();
 
-    private ScheduledExecutorService cacheCleaner = Executors.newSingleThreadScheduledExecutor();
-
     @Activate
     public void activate() {
         KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
@@ -165,10 +163,12 @@
             .build()
             .asAtomicCounter();
 
-        cacheCleaner.scheduleAtFixedRate(pendingHostsCache::cleanUp, 0,
-                PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "cache-cleaner", log));
+        cacheCleaner.scheduleAtFixedRate(pendingHostsCache::cleanUp, 0, PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
 
-        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "store", log));
+        locationRemover = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "loc-remover", log));
+
+        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "status-listener", log));
         statusChangeListener = status -> {
             if (status == Status.ACTIVE) {
                 executor.execute(this::loadHostsByIp);
@@ -184,6 +184,8 @@
         hostsConsistentMap.removeListener(hostLocationTracker);
 
         cacheCleaner.shutdown();
+        locationRemover.shutdown();
+        executor.shutdown();
 
         log.info("Stopped");
     }
@@ -545,7 +547,6 @@
                 case UPDATE:
                     // Remove the host location if probe timeout in VERIFY mode
                     if (newValue.value().expired() && newValue.value().probeMode() == ProbeMode.VERIFY) {
-                        Executor locationRemover = Executors.newSingleThreadScheduledExecutor();
                         locationRemover.execute(() -> {
                             pendingHosts.remove(event.key());
                             removeLocation(newValue.value().hostId(),