Fix more thread leaks

Change-Id: I573c30cfb90de485b5ed0eecb9d79ac567e4acbd
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index 8dc1613..b9f7a7d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -65,8 +65,6 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -108,6 +106,8 @@
             new PendingHostListener();
 
     private ScheduledExecutorService executor;
+    private ScheduledExecutorService cacheCleaner;
+    private ScheduledExecutorService locationRemover;
 
     private Consumer<Status> statusChangeListener;
 
@@ -134,8 +134,6 @@
                 }
             }).build();
 
-    private ScheduledExecutorService cacheCleaner = Executors.newSingleThreadScheduledExecutor();
-
     @Activate
     public void activate() {
         KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
@@ -165,10 +163,12 @@
             .build()
             .asAtomicCounter();
 
-        cacheCleaner.scheduleAtFixedRate(pendingHostsCache::cleanUp, 0,
-                PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "cache-cleaner", log));
+        cacheCleaner.scheduleAtFixedRate(pendingHostsCache::cleanUp, 0, PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
 
-        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "store", log));
+        locationRemover = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "loc-remover", log));
+
+        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "status-listener", log));
         statusChangeListener = status -> {
             if (status == Status.ACTIVE) {
                 executor.execute(this::loadHostsByIp);
@@ -184,6 +184,8 @@
         hostsConsistentMap.removeListener(hostLocationTracker);
 
         cacheCleaner.shutdown();
+        locationRemover.shutdown();
+        executor.shutdown();
 
         log.info("Stopped");
     }
@@ -549,7 +551,6 @@
                 case UPDATE:
                     // Remove the host location if probe timeout in VERIFY mode
                     if (newValue.value().expired() && newValue.value().probeMode() == ProbeMode.VERIFY) {
-                        Executor locationRemover = Executors.newSingleThreadScheduledExecutor();
                         locationRemover.execute(() -> {
                             pendingHosts.remove(event.key());
                             removeLocation(newValue.value().hostId(),