Send/receive heartbeats on shared thread to avoid concurrent updates to ClusterStore state when sending/receiving heartbeats
Change-Id: I7832e840d0364788159a9acf0d070c3bb4255d68
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index dbdbabe..306ebcb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -17,7 +17,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -29,8 +28,8 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
-import org.onosproject.cfg.ConfigProperty;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cfg.ConfigProperty;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterStore;
@@ -53,9 +52,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -113,10 +112,9 @@
private final Map<NodeId, Version> nodeVersions = Maps.newConcurrentMap();
private final Map<NodeId, Instant> nodeLastUpdatedTimes = Maps.newConcurrentMap();
- private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-sender", log));
- private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-receiver", log));
+ private ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/cluster/membership", "heartbeat", log));
+ private ScheduledFuture<?> heartbeatFuture;
private PhiAccrualFailureDetector failureDetector;
@@ -170,12 +168,11 @@
localVersion = versionService.version();
nodeVersions.put(localNode.id(), localVersion);
- messagingService.registerHandler(HEARTBEAT_MESSAGE,
- new HeartbeatMessageHandler(), heartBeatMessageHandler);
+ messagingService.registerHandler(HEARTBEAT_MESSAGE, new HeartbeatMessageHandler(), heartbeatScheduler);
failureDetector = new PhiAccrualFailureDetector(minStandardDeviationMillis);
- heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+ heartbeatFuture = heartbeatScheduler.scheduleWithFixedDelay(this::heartbeat, 0,
heartbeatInterval, TimeUnit.MILLISECONDS);
log.info("Started");
@@ -184,8 +181,7 @@
@Deactivate
public void deactivate() {
messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
- heartBeatSender.shutdownNow();
- heartBeatMessageHandler.shutdownNow();
+ heartbeatScheduler.shutdownNow();
log.info("Stopped");
}
@@ -475,12 +471,11 @@
*/
private void restartHeartbeatSender() {
try {
- ScheduledExecutorService prevSender = heartBeatSender;
- heartBeatSender = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-sender-%d", log));
- heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+ if (heartbeatFuture != null) {
+ heartbeatFuture.cancel(false);
+ }
+ heartbeatFuture = heartbeatScheduler.scheduleWithFixedDelay(this::heartbeat, 0,
heartbeatInterval, TimeUnit.MILLISECONDS);
- prevSender.shutdown();
} catch (Exception e) {
log.warn(e.getMessage());
}