[Goldeneye][ONOS-4038] Support configurable heartbeat on DistributedClusterStore
- Add readComponentConfiguration method for @Modified
- Apply updated Tools
- Add unit test code
- Add checkNotNull about NodeId

Change-Id: If8b7d4c00f2c72d29c0abb6407530d76bc3f6d80
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index ac8692b..8445ac3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -21,12 +21,15 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.joda.time.DateTime;
 import org.onlab.packet.IpAddress;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterStore;
@@ -40,8 +43,10 @@
 import org.onosproject.store.cluster.messaging.MessagingService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import java.util.Dictionary;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -52,6 +57,7 @@
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
@@ -73,9 +79,15 @@
 
     public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
 
-    // TODO: make these configurable.
-    private static final int HEARTBEAT_INTERVAL_MS = 100;
-    private static final int PHI_FAILURE_THRESHOLD = 10;
+    private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
+    @Property(name = "heartbeatInterval", intValue = DEFAULT_HEARTBEAT_INTERVAL,
+            label = "Interval time to send heartbeat to other controller nodes (millisecond)")
+    private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+
+    private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
+    @Property(name = "phiFailureThreshold", intValue = DEFAULT_PHI_FAILURE_THRESHOLD,
+            label = "the value of Phi threshold to detect accrual failure")
+    private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
 
     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
@@ -119,7 +131,7 @@
         failureDetector = new PhiAccrualFailureDetector();
 
         heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
-                                               HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
+                                               heartbeatInterval, TimeUnit.MILLISECONDS);
 
         log.info("Started");
     }
@@ -133,6 +145,12 @@
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        readComponentConfiguration(context);
+        restartHeartbeatSender();
+    }
+
     @Override
     public void setDelegate(ClusterStoreDelegate delegate) {
         checkNotNull(delegate, "Delegate cannot be null");
@@ -178,6 +196,7 @@
 
     @Override
     public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
         ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
         addNode(node);
         return node;
@@ -220,7 +239,7 @@
                 heartbeatToPeer(hbMessagePayload, node);
                 State currentState = nodeStates.get(node.id());
                 double phi = failureDetector.phi(node.id());
-                if (phi >= PHI_FAILURE_THRESHOLD) {
+                if (phi >= phiFailureThreshold) {
                     if (currentState.isActive()) {
                         updateState(node.id(), State.INACTIVE);
                     }
@@ -291,4 +310,98 @@
         return nodeStateLastUpdatedTimes.get(nodeId);
     }
 
-}
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        Integer newHeartbeatInterval = Tools.getIntegerProperty(properties,
+                                                                "heartbeatInterval");
+        if (newHeartbeatInterval == null) {
+            setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL);
+            log.info("Heartbeat interval time is not configured, default value is {}",
+                     DEFAULT_HEARTBEAT_INTERVAL);
+        } else {
+            setHeartbeatInterval(newHeartbeatInterval);
+            log.info("Configured. Heartbeat interval time is configured to {}",
+                     heartbeatInterval);
+        }
+
+        Integer newPhiFailureThreshold = Tools.getIntegerProperty(properties,
+                                                                  "phiFailureThreshold");
+        if (newPhiFailureThreshold == null) {
+            setPhiFailureThreshold(DEFAULT_PHI_FAILURE_THRESHOLD);
+            log.info("Phi failure threshold is not configured, default value is {}",
+                     DEFAULT_PHI_FAILURE_THRESHOLD);
+        } else {
+            setPhiFailureThreshold(newPhiFailureThreshold);
+            log.info("Configured. Phi failure threshold is configured to {}",
+                     phiFailureThreshold);
+        }
+    }
+
+    /**
+     * Sets heartbeat interval between the termination of one execution of heartbeat
+     * and the commencement of the next.
+     *
+     * @param interval term between each heartbeat
+     */
+    private void setHeartbeatInterval(int interval) {
+        try {
+            checkArgument(interval > 0, "Interval must be greater than zero");
+            heartbeatInterval = interval;
+        } catch (IllegalArgumentException e) {
+            log.warn(e.getMessage());
+            heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+        }
+    }
+
+    /**
+     * Sets Phi failure threshold.
+     * Phi is based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
+     *
+     * @param threshold
+     */
+    private void setPhiFailureThreshold(int threshold) {
+        phiFailureThreshold = threshold;
+    }
+
+    /**
+     * Restarts heartbeatSender executor.
+     *
+     */
+    private void restartHeartbeatSender() {
+        try {
+            ScheduledExecutorService prevSender = heartBeatSender;
+            heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+                    groupedThreads("onos/cluster/membership", "heartbeat-sender-%d"));
+            heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+                                                   heartbeatInterval, TimeUnit.MILLISECONDS);
+            prevSender.shutdown();
+        } catch (Exception e) {
+            log.warn(e.getMessage());
+        }
+    }
+
+    /**
+     * Gets current heartbeat interval.
+     *
+     * @return heartbeatInterval
+     */
+    private int getHeartbeatInterval() {
+        return heartbeatInterval;
+    }
+
+    /**
+     * Gets current Phi failure threshold for Accrual Failure Detector.
+     *
+     * @return phiFailureThreshold
+     */
+    private int getPhiFailureThreshold() {
+        return phiFailureThreshold;
+    }
+
+}
\ No newline at end of file