[Goldeneye][ONOS-4038] Support configurable heartbeat on DistributedClusterStore
- Add readComponentConfiguration method for @Modified
- Apply updated Tools
- Add unit test code
- Add checkNotNull about NodeId
Change-Id: If8b7d4c00f2c72d29c0abb6407530d76bc3f6d80
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index ac8692b..8445ac3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -21,12 +21,15 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterStore;
@@ -40,8 +43,10 @@
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
+import java.util.Dictionary;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -52,6 +57,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
@@ -73,9 +79,15 @@
public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
- // TODO: make these configurable.
- private static final int HEARTBEAT_INTERVAL_MS = 100;
- private static final int PHI_FAILURE_THRESHOLD = 10;
+ private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
+ @Property(name = "heartbeatInterval", intValue = DEFAULT_HEARTBEAT_INTERVAL,
+ label = "Interval time to send heartbeat to other controller nodes (millisecond)")
+ private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+
+ private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
+ @Property(name = "phiFailureThreshold", intValue = DEFAULT_PHI_FAILURE_THRESHOLD,
+ label = "the value of Phi threshold to detect accrual failure")
+ private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -119,7 +131,7 @@
failureDetector = new PhiAccrualFailureDetector();
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
- HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ heartbeatInterval, TimeUnit.MILLISECONDS);
log.info("Started");
}
@@ -133,6 +145,12 @@
log.info("Stopped");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ readComponentConfiguration(context);
+ restartHeartbeatSender();
+ }
+
@Override
public void setDelegate(ClusterStoreDelegate delegate) {
checkNotNull(delegate, "Delegate cannot be null");
@@ -178,6 +196,7 @@
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
addNode(node);
return node;
@@ -220,7 +239,7 @@
heartbeatToPeer(hbMessagePayload, node);
State currentState = nodeStates.get(node.id());
double phi = failureDetector.phi(node.id());
- if (phi >= PHI_FAILURE_THRESHOLD) {
+ if (phi >= phiFailureThreshold) {
if (currentState.isActive()) {
updateState(node.id(), State.INACTIVE);
}
@@ -291,4 +310,98 @@
return nodeStateLastUpdatedTimes.get(nodeId);
}
-}
+ /**
+ * Extracts properties from the component configuration context.
+ *
+ * @param context the component context
+ */
+ private void readComponentConfiguration(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ Integer newHeartbeatInterval = Tools.getIntegerProperty(properties,
+ "heartbeatInterval");
+ if (newHeartbeatInterval == null) {
+ setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL);
+ log.info("Heartbeat interval time is not configured, default value is {}",
+ DEFAULT_HEARTBEAT_INTERVAL);
+ } else {
+ setHeartbeatInterval(newHeartbeatInterval);
+ log.info("Configured. Heartbeat interval time is configured to {}",
+ heartbeatInterval);
+ }
+
+ Integer newPhiFailureThreshold = Tools.getIntegerProperty(properties,
+ "phiFailureThreshold");
+ if (newPhiFailureThreshold == null) {
+ setPhiFailureThreshold(DEFAULT_PHI_FAILURE_THRESHOLD);
+ log.info("Phi failure threshold is not configured, default value is {}",
+ DEFAULT_PHI_FAILURE_THRESHOLD);
+ } else {
+ setPhiFailureThreshold(newPhiFailureThreshold);
+ log.info("Configured. Phi failure threshold is configured to {}",
+ phiFailureThreshold);
+ }
+ }
+
+ /**
+ * Sets heartbeat interval between the termination of one execution of heartbeat
+ * and the commencement of the next.
+ *
+ * @param interval term between each heartbeat
+ */
+ private void setHeartbeatInterval(int interval) {
+ try {
+ checkArgument(interval > 0, "Interval must be greater than zero");
+ heartbeatInterval = interval;
+ } catch (IllegalArgumentException e) {
+ log.warn(e.getMessage());
+ heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+ }
+ }
+
+ /**
+ * Sets Phi failure threshold.
+ * Phi is based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
+ *
+ * @param threshold
+ */
+ private void setPhiFailureThreshold(int threshold) {
+ phiFailureThreshold = threshold;
+ }
+
+ /**
+ * Restarts heartbeatSender executor.
+ *
+ */
+ private void restartHeartbeatSender() {
+ try {
+ ScheduledExecutorService prevSender = heartBeatSender;
+ heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/cluster/membership", "heartbeat-sender-%d"));
+ heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+ heartbeatInterval, TimeUnit.MILLISECONDS);
+ prevSender.shutdown();
+ } catch (Exception e) {
+ log.warn(e.getMessage());
+ }
+ }
+
+ /**
+ * Gets current heartbeat interval.
+ *
+ * @return heartbeatInterval
+ */
+ private int getHeartbeatInterval() {
+ return heartbeatInterval;
+ }
+
+ /**
+ * Gets current Phi failure threshold for Accrual Failure Detector.
+ *
+ * @return phiFailureThreshold
+ */
+ private int getPhiFailureThreshold() {
+ return phiFailureThreshold;
+ }
+
+}
\ No newline at end of file