Send/receive heartbeats on shared thread to avoid concurrent updates to ClusterStore state when sending/receiving heartbeats

Change-Id: I7832e840d0364788159a9acf0d070c3bb4255d68
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index dbdbabe..306ebcb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -17,7 +17,6 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -29,8 +28,8 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.IpAddress;
 import org.onlab.util.KryoNamespace;
-import org.onosproject.cfg.ConfigProperty;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cfg.ConfigProperty;
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterStore;
@@ -53,9 +52,9 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -113,10 +112,9 @@
     private final Map<NodeId, Version> nodeVersions = Maps.newConcurrentMap();
     private final Map<NodeId, Instant> nodeLastUpdatedTimes = Maps.newConcurrentMap();
 
-    private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
-            groupedThreads("onos/cluster/membership", "heartbeat-sender", log));
-    private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
-            groupedThreads("onos/cluster/membership", "heartbeat-receiver", log));
+    private ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(
+            groupedThreads("onos/cluster/membership", "heartbeat", log));
+    private ScheduledFuture<?> heartbeatFuture;
 
     private PhiAccrualFailureDetector failureDetector;
 
@@ -170,12 +168,11 @@
         localVersion = versionService.version();
         nodeVersions.put(localNode.id(), localVersion);
 
-        messagingService.registerHandler(HEARTBEAT_MESSAGE,
-                new HeartbeatMessageHandler(), heartBeatMessageHandler);
+        messagingService.registerHandler(HEARTBEAT_MESSAGE, new HeartbeatMessageHandler(), heartbeatScheduler);
 
         failureDetector = new PhiAccrualFailureDetector(minStandardDeviationMillis);
 
-        heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+        heartbeatFuture = heartbeatScheduler.scheduleWithFixedDelay(this::heartbeat, 0,
                 heartbeatInterval, TimeUnit.MILLISECONDS);
 
         log.info("Started");
@@ -184,8 +181,7 @@
     @Deactivate
     public void deactivate() {
         messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
-        heartBeatSender.shutdownNow();
-        heartBeatMessageHandler.shutdownNow();
+        heartbeatScheduler.shutdownNow();
 
         log.info("Stopped");
     }
@@ -475,12 +471,11 @@
      */
     private void restartHeartbeatSender() {
         try {
-            ScheduledExecutorService prevSender = heartBeatSender;
-            heartBeatSender = Executors.newSingleThreadScheduledExecutor(
-                    groupedThreads("onos/cluster/membership", "heartbeat-sender-%d", log));
-            heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+            if (heartbeatFuture != null) {
+                heartbeatFuture.cancel(false);
+            }
+            heartbeatFuture = heartbeatScheduler.scheduleWithFixedDelay(this::heartbeat, 0,
                     heartbeatInterval, TimeUnit.MILLISECONDS);
-            prevSender.shutdown();
         } catch (Exception e) {
             log.warn(e.getMessage());
         }