[ONOS-7753] Ensure cluster heartbeats are recorded when last heartbeat time is set to default value
Change-Id: If44db5029ec8e55b86e26b943ba4d7bf3afd6d06
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index e8a6f01..dbdbabe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -342,8 +342,8 @@
if (clusterMetadataService.getClusterMetadata().getNodes().contains(hb.source())) {
// Avoid reporting heartbeats that have been enqueued by setting a minimum interval.
long heartbeatTime = System.currentTimeMillis();
- long lastHeartbeatTime = failureDetector.getLastHeartbeatTime(hb.source().id());
- if (heartbeatTime - lastHeartbeatTime > heartbeatInterval / 2) {
+ Long lastHeartbeatTime = failureDetector.getLastHeartbeatTime(hb.source().id());
+ if (lastHeartbeatTime == null || heartbeatTime - lastHeartbeatTime > heartbeatInterval / 2) {
failureDetector.report(hb.source().id(), heartbeatTime);
}
updateNode(hb.source().id(), hb.state, hb.version);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
index f6acac3..b34ccb0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
@@ -15,15 +15,14 @@
*/
package org.onosproject.store.cluster.impl;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.onosproject.cluster.NodeId;
-import com.google.common.collect.Maps;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* Phi Accrual failure detector.
@@ -31,7 +30,7 @@
* Based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
*/
public class PhiAccrualFailureDetector {
- private final Map<NodeId, History> states = Maps.newConcurrentMap();
+ private final Map<NodeId, NodeFailureDetector> nodes = Maps.newConcurrentMap();
// Default value
private static final int DEFAULT_WINDOW_SIZE = 250;
@@ -68,9 +67,9 @@
* @param nodeId the node identifier
* @return the last heartbeat time for the given node
*/
- public long getLastHeartbeatTime(NodeId nodeId) {
- History nodeState = states.computeIfAbsent(nodeId, key -> new History());
- return nodeState.latestHeartbeatTime();
+ public Long getLastHeartbeatTime(NodeId nodeId) {
+ NodeFailureDetector node = nodes.get(nodeId);
+ return node != null ? node.lastHeartbeatTime() : null;
}
/**
@@ -89,13 +88,11 @@
public void report(NodeId nodeId, long arrivalTime) {
checkNotNull(nodeId, "NodeId must not be null");
checkArgument(arrivalTime >= 0, "arrivalTime must not be negative");
- History nodeState = states.computeIfAbsent(nodeId, key -> new History());
- synchronized (nodeState) {
- long latestHeartbeat = nodeState.latestHeartbeatTime();
- if (latestHeartbeat != -1) {
- nodeState.samples().addValue(arrivalTime - latestHeartbeat);
- }
- nodeState.setLatestHeartbeatTime(arrivalTime);
+ NodeFailureDetector node = nodes.get(nodeId);
+ if (node == null) {
+ nodes.computeIfAbsent(nodeId, key -> new NodeFailureDetector());
+ } else {
+ node.setLastHeartbeatTime(arrivalTime);
}
}
@@ -105,7 +102,7 @@
* @param nodeId node identifier for the node for which to reset the failure detector
*/
public void reset(NodeId nodeId) {
- states.remove(nodeId);
+ nodes.remove(nodeId);
}
/**
@@ -114,47 +111,68 @@
* @return phi value
*/
public double phi(NodeId nodeId) {
+ return phi(nodeId, System.currentTimeMillis());
+ }
+
+ /**
+ * Compute phi for the specified node id.
+ * @param nodeId node id
+ * @param currentTime the current timestamp
+ * @return phi value
+ */
+ public double phi(NodeId nodeId, long currentTime) {
checkNotNull(nodeId, "NodeId must not be null");
- if (!states.containsKey(nodeId)) {
+ NodeFailureDetector node = nodes.get(nodeId);
+ if (node == null) {
return bootstrapPhiValue;
}
- History nodeState = states.get(nodeId);
- synchronized (nodeState) {
- long latestHeartbeat = nodeState.latestHeartbeatTime();
- DescriptiveStatistics samples = nodeState.samples();
- if (latestHeartbeat == -1 || samples.getN() < minSamples) {
- return 0.0;
- }
- return computePhi(samples, latestHeartbeat, System.currentTimeMillis());
- }
+ return node.computePhi(currentTime);
}
- private double computePhi(DescriptiveStatistics samples, long tLast, long tNow) {
- long elapsedTime = tNow - tLast;
- double meanMillis = samples.getMean();
- double y = (elapsedTime - meanMillis) / Math.max(samples.getStandardDeviation(), minStandardDeviationMillis);
- double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
- if (elapsedTime > meanMillis) {
- return -Math.log10(e / (1.0 + e));
- } else {
- return -Math.log10(1.0 - 1.0 / (1.0 + e));
- }
- }
+ private class NodeFailureDetector {
+ private final DescriptiveStatistics samples = new DescriptiveStatistics(DEFAULT_WINDOW_SIZE);
+ private volatile long lastHeartbeatTime = System.currentTimeMillis();
- private static class History {
- DescriptiveStatistics samples = new DescriptiveStatistics(DEFAULT_WINDOW_SIZE);
- long lastHeartbeatTime = -1;
-
- DescriptiveStatistics samples() {
- return samples;
- }
-
- long latestHeartbeatTime() {
+ /**
+ * Returns the last heartbeat time.
+ *
+ * @return the last heartbeat time
+ */
+ long lastHeartbeatTime() {
return lastHeartbeatTime;
}
- void setLatestHeartbeatTime(long value) {
- lastHeartbeatTime = value;
+ /**
+ * Sets the last heartbeat time.
+ *
+ * @param heartbeatTime the last heartbeat time
+ */
+ synchronized void setLastHeartbeatTime(long heartbeatTime) {
+ samples.addValue(heartbeatTime - lastHeartbeatTime);
+ lastHeartbeatTime = heartbeatTime;
+ }
+
+ /**
+ * Computes the phi value for this node.
+ *
+ * @param currentTime the current timestamp
+ * @return the phi value
+ */
+ synchronized double computePhi(long currentTime) {
+ if (samples.getN() < minSamples) {
+ return 0.0;
+ }
+
+ long elapsedTime = currentTime - lastHeartbeatTime;
+ double meanMillis = samples.getMean();
+ double standardDeviation = samples.getStandardDeviation();
+ double y = (elapsedTime - meanMillis) / Math.max(standardDeviation, minStandardDeviationMillis);
+ double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
+ if (elapsedTime > meanMillis) {
+ return -Math.log10(e / (1.0 + e));
+ } else {
+ return -Math.log10(1.0 - 1.0 / (1.0 + e));
+ }
}
}
}
\ No newline at end of file