[Goldeneye][ONOS-4038] Support configurable heartbeat on DistributedClusterStore
- Add readComponentConfiguration method for @Modified
- Apply updated Tools
- Add unit test code
- Add checkNotNull about NodeId
Change-Id: If8b7d4c00f2c72d29c0abb6407530d76bc3f6d80
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index ac8692b..8445ac3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -21,12 +21,15 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterStore;
@@ -40,8 +43,10 @@
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
+import java.util.Dictionary;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -52,6 +57,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
@@ -73,9 +79,15 @@
public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
- // TODO: make these configurable.
- private static final int HEARTBEAT_INTERVAL_MS = 100;
- private static final int PHI_FAILURE_THRESHOLD = 10;
+ private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
+ @Property(name = "heartbeatInterval", intValue = DEFAULT_HEARTBEAT_INTERVAL,
+ label = "Interval time to send heartbeat to other controller nodes (millisecond)")
+ private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+
+ private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
+ @Property(name = "phiFailureThreshold", intValue = DEFAULT_PHI_FAILURE_THRESHOLD,
+ label = "the value of Phi threshold to detect accrual failure")
+ private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -119,7 +131,7 @@
failureDetector = new PhiAccrualFailureDetector();
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
- HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ heartbeatInterval, TimeUnit.MILLISECONDS);
log.info("Started");
}
@@ -133,6 +145,12 @@
log.info("Stopped");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ readComponentConfiguration(context);
+ restartHeartbeatSender();
+ }
+
@Override
public void setDelegate(ClusterStoreDelegate delegate) {
checkNotNull(delegate, "Delegate cannot be null");
@@ -178,6 +196,7 @@
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
addNode(node);
return node;
@@ -220,7 +239,7 @@
heartbeatToPeer(hbMessagePayload, node);
State currentState = nodeStates.get(node.id());
double phi = failureDetector.phi(node.id());
- if (phi >= PHI_FAILURE_THRESHOLD) {
+ if (phi >= phiFailureThreshold) {
if (currentState.isActive()) {
updateState(node.id(), State.INACTIVE);
}
@@ -291,4 +310,98 @@
return nodeStateLastUpdatedTimes.get(nodeId);
}
-}
+ /**
+ * Extracts properties from the component configuration context.
+ *
+ * @param context the component context
+ */
+ private void readComponentConfiguration(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ Integer newHeartbeatInterval = Tools.getIntegerProperty(properties,
+ "heartbeatInterval");
+ if (newHeartbeatInterval == null) {
+ setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL);
+ log.info("Heartbeat interval time is not configured, default value is {}",
+ DEFAULT_HEARTBEAT_INTERVAL);
+ } else {
+ setHeartbeatInterval(newHeartbeatInterval);
+ log.info("Configured. Heartbeat interval time is configured to {}",
+ heartbeatInterval);
+ }
+
+ Integer newPhiFailureThreshold = Tools.getIntegerProperty(properties,
+ "phiFailureThreshold");
+ if (newPhiFailureThreshold == null) {
+ setPhiFailureThreshold(DEFAULT_PHI_FAILURE_THRESHOLD);
+ log.info("Phi failure threshold is not configured, default value is {}",
+ DEFAULT_PHI_FAILURE_THRESHOLD);
+ } else {
+ setPhiFailureThreshold(newPhiFailureThreshold);
+ log.info("Configured. Phi failure threshold is configured to {}",
+ phiFailureThreshold);
+ }
+ }
+
+ /**
+ * Sets heartbeat interval between the termination of one execution of heartbeat
+ * and the commencement of the next.
+ *
+ * @param interval term between each heartbeat
+ */
+ private void setHeartbeatInterval(int interval) {
+ try {
+ checkArgument(interval > 0, "Interval must be greater than zero");
+ heartbeatInterval = interval;
+ } catch (IllegalArgumentException e) {
+ log.warn(e.getMessage());
+ heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+ }
+ }
+
+ /**
+ * Sets Phi failure threshold.
+ * Phi is based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
+ *
+ * @param threshold
+ */
+ private void setPhiFailureThreshold(int threshold) {
+ phiFailureThreshold = threshold;
+ }
+
+ /**
+ * Restarts heartbeatSender executor.
+ *
+ */
+ private void restartHeartbeatSender() {
+ try {
+ ScheduledExecutorService prevSender = heartBeatSender;
+ heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/cluster/membership", "heartbeat-sender-%d"));
+ heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+ heartbeatInterval, TimeUnit.MILLISECONDS);
+ prevSender.shutdown();
+ } catch (Exception e) {
+ log.warn(e.getMessage());
+ }
+ }
+
+ /**
+ * Gets current heartbeat interval.
+ *
+ * @return heartbeatInterval
+ */
+ private int getHeartbeatInterval() {
+ return heartbeatInterval;
+ }
+
+ /**
+ * Gets current Phi failure threshold for Accrual Failure Detector.
+ *
+ * @return phiFailureThreshold
+ */
+ private int getPhiFailureThreshold() {
+ return phiFailureThreshold;
+ }
+
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
index cdb138b..22911f6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
@@ -33,15 +33,20 @@
public class PhiAccrualFailureDetector {
private final Map<NodeId, History> states = Maps.newConcurrentMap();
- // TODO: make these configurable.
- private static final int WINDOW_SIZE = 250;
- private static final int MIN_SAMPLES = 25;
- private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
+ // Default value
+ private static final int DEFAULT_WINDOW_SIZE = 250;
+ private static final int DEFAULT_MIN_SAMPLES = 25;
+ private static final double DEFAULT_PHI_FACTOR = 1.0 / Math.log(10.0);
// If a node does not have any heartbeats, this is the phi
// value to report. Indicates the node is inactive (from the
// detectors perspective.
- private static final double BOOTSTRAP_PHI_VALUE = 100.0;
+ private static final double DEFAULT_BOOTSTRAP_PHI_VALUE = 100.0;
+
+
+ private int minSamples = DEFAULT_MIN_SAMPLES;
+ private double phiFactor = DEFAULT_PHI_FACTOR;
+ private double bootstrapPhiValue = DEFAULT_BOOTSTRAP_PHI_VALUE;
/**
* Report a new heart beat for the specified node id.
@@ -70,6 +75,8 @@
}
}
+
+
/**
* Compute phi for the specified node id.
* @param nodeId node id
@@ -78,13 +85,13 @@
public double phi(NodeId nodeId) {
checkNotNull(nodeId, "NodeId must not be null");
if (!states.containsKey(nodeId)) {
- return BOOTSTRAP_PHI_VALUE;
+ return bootstrapPhiValue;
}
History nodeState = states.get(nodeId);
synchronized (nodeState) {
long latestHeartbeat = nodeState.latestHeartbeatTime();
DescriptiveStatistics samples = nodeState.samples();
- if (latestHeartbeat == -1 || samples.getN() < MIN_SAMPLES) {
+ if (latestHeartbeat == -1 || samples.getN() < minSamples) {
return 0.0;
}
return computePhi(samples, latestHeartbeat, System.currentTimeMillis());
@@ -95,13 +102,27 @@
long size = samples.getN();
long t = tNow - tLast;
return (size > 0)
- ? PHI_FACTOR * t / samples.getMean()
- : BOOTSTRAP_PHI_VALUE;
+ ? phiFactor * t / samples.getMean()
+ : bootstrapPhiValue;
}
+
+ private void setMinSamples(int samples) {
+ minSamples = samples;
+ }
+
+ private void setPhiFactor(double factor) {
+ phiFactor = factor;
+ }
+
+ private void setBootstrapPhiValue(double phiValue) {
+ bootstrapPhiValue = phiValue;
+ }
+
+
private static class History {
DescriptiveStatistics samples =
- new DescriptiveStatistics(WINDOW_SIZE);
+ new DescriptiveStatistics(DEFAULT_WINDOW_SIZE);
long lastHeartbeatTime = -1;
public DescriptiveStatistics samples() {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/DistributedClusterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/DistributedClusterStoreTest.java
new file mode 100644
index 0000000..6efbae4
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/DistributedClusterStoreTest.java
@@ -0,0 +1,24 @@
+package org.onosproject.store.cluster.impl;
+
+import org.junit.After;
+import org.junit.Before;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for DistributedClusterStore.
+ */
+public class DistributedClusterStoreTest {
+ DistributedClusterStore distributedClusterStore;
+
+ @Before
+ public void setUp() throws Exception {
+ distributedClusterStore = new DistributedClusterStore();
+ distributedClusterStore.activate();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ distributedClusterStore.deactivate();
+ }
+}
\ No newline at end of file