[Goldeneye][ONOS-4038] Support configurable heartbeat on DistributedClusterStore
- Add readComponentConfiguration method for @Modified
- Apply updated Tools
- Add unit test code
- Add checkNotNull about NodeId

Change-Id: If8b7d4c00f2c72d29c0abb6407530d76bc3f6d80
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index ac8692b..8445ac3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -21,12 +21,15 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.joda.time.DateTime;
 import org.onlab.packet.IpAddress;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterStore;
@@ -40,8 +43,10 @@
 import org.onosproject.store.cluster.messaging.MessagingService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import java.util.Dictionary;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -52,6 +57,7 @@
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
@@ -73,9 +79,15 @@
 
     public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
 
-    // TODO: make these configurable.
-    private static final int HEARTBEAT_INTERVAL_MS = 100;
-    private static final int PHI_FAILURE_THRESHOLD = 10;
+    private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
+    @Property(name = "heartbeatInterval", intValue = DEFAULT_HEARTBEAT_INTERVAL,
+            label = "Interval time to send heartbeat to other controller nodes (millisecond)")
+    private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+
+    private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
+    @Property(name = "phiFailureThreshold", intValue = DEFAULT_PHI_FAILURE_THRESHOLD,
+            label = "the value of Phi threshold to detect accrual failure")
+    private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
 
     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
@@ -119,7 +131,7 @@
         failureDetector = new PhiAccrualFailureDetector();
 
         heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
-                                               HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
+                                               heartbeatInterval, TimeUnit.MILLISECONDS);
 
         log.info("Started");
     }
@@ -133,6 +145,12 @@
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        readComponentConfiguration(context);
+        restartHeartbeatSender();
+    }
+
     @Override
     public void setDelegate(ClusterStoreDelegate delegate) {
         checkNotNull(delegate, "Delegate cannot be null");
@@ -178,6 +196,7 @@
 
     @Override
     public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
         ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
         addNode(node);
         return node;
@@ -220,7 +239,7 @@
                 heartbeatToPeer(hbMessagePayload, node);
                 State currentState = nodeStates.get(node.id());
                 double phi = failureDetector.phi(node.id());
-                if (phi >= PHI_FAILURE_THRESHOLD) {
+                if (phi >= phiFailureThreshold) {
                     if (currentState.isActive()) {
                         updateState(node.id(), State.INACTIVE);
                     }
@@ -291,4 +310,98 @@
         return nodeStateLastUpdatedTimes.get(nodeId);
     }
 
-}
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        Integer newHeartbeatInterval = Tools.getIntegerProperty(properties,
+                                                                "heartbeatInterval");
+        if (newHeartbeatInterval == null) {
+            setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL);
+            log.info("Heartbeat interval time is not configured, default value is {}",
+                     DEFAULT_HEARTBEAT_INTERVAL);
+        } else {
+            setHeartbeatInterval(newHeartbeatInterval);
+            log.info("Configured. Heartbeat interval time is configured to {}",
+                     heartbeatInterval);
+        }
+
+        Integer newPhiFailureThreshold = Tools.getIntegerProperty(properties,
+                                                                  "phiFailureThreshold");
+        if (newPhiFailureThreshold == null) {
+            setPhiFailureThreshold(DEFAULT_PHI_FAILURE_THRESHOLD);
+            log.info("Phi failure threshold is not configured, default value is {}",
+                     DEFAULT_PHI_FAILURE_THRESHOLD);
+        } else {
+            setPhiFailureThreshold(newPhiFailureThreshold);
+            log.info("Configured. Phi failure threshold is configured to {}",
+                     phiFailureThreshold);
+        }
+    }
+
+    /**
+     * Sets heartbeat interval between the termination of one execution of heartbeat
+     * and the commencement of the next.
+     *
+     * @param interval term between each heartbeat
+     */
+    private void setHeartbeatInterval(int interval) {
+        try {
+            checkArgument(interval > 0, "Interval must be greater than zero");
+            heartbeatInterval = interval;
+        } catch (IllegalArgumentException e) {
+            log.warn(e.getMessage());
+            heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+        }
+    }
+
+    /**
+     * Sets Phi failure threshold.
+     * Phi is based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
+     *
+     * @param threshold
+     */
+    private void setPhiFailureThreshold(int threshold) {
+        phiFailureThreshold = threshold;
+    }
+
+    /**
+     * Restarts heartbeatSender executor.
+     *
+     */
+    private void restartHeartbeatSender() {
+        try {
+            ScheduledExecutorService prevSender = heartBeatSender;
+            heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+                    groupedThreads("onos/cluster/membership", "heartbeat-sender-%d"));
+            heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+                                                   heartbeatInterval, TimeUnit.MILLISECONDS);
+            prevSender.shutdown();
+        } catch (Exception e) {
+            log.warn(e.getMessage());
+        }
+    }
+
+    /**
+     * Gets current heartbeat interval.
+     *
+     * @return heartbeatInterval
+     */
+    private int getHeartbeatInterval() {
+        return heartbeatInterval;
+    }
+
+    /**
+     * Gets current Phi failure threshold for Accrual Failure Detector.
+     *
+     * @return phiFailureThreshold
+     */
+    private int getPhiFailureThreshold() {
+        return phiFailureThreshold;
+    }
+
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
index cdb138b..22911f6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
@@ -33,15 +33,20 @@
 public class PhiAccrualFailureDetector {
     private final Map<NodeId, History> states = Maps.newConcurrentMap();
 
-    // TODO: make these configurable.
-    private static final int WINDOW_SIZE = 250;
-    private static final int MIN_SAMPLES = 25;
-    private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
+    // Default value
+    private static final int DEFAULT_WINDOW_SIZE = 250;
+    private static final int DEFAULT_MIN_SAMPLES = 25;
+    private static final double DEFAULT_PHI_FACTOR = 1.0 / Math.log(10.0);
 
     // If a node does not have any heartbeats, this is the phi
     // value to report. Indicates the node is inactive (from the
     // detectors perspective.
-    private static final double BOOTSTRAP_PHI_VALUE = 100.0;
+    private static final double DEFAULT_BOOTSTRAP_PHI_VALUE = 100.0;
+
+
+    private int minSamples = DEFAULT_MIN_SAMPLES;
+    private double phiFactor = DEFAULT_PHI_FACTOR;
+    private double bootstrapPhiValue = DEFAULT_BOOTSTRAP_PHI_VALUE;
 
     /**
      * Report a new heart beat for the specified node id.
@@ -70,6 +75,8 @@
         }
     }
 
+
+
     /**
      * Compute phi for the specified node id.
      * @param nodeId node id
@@ -78,13 +85,13 @@
     public double phi(NodeId nodeId) {
         checkNotNull(nodeId, "NodeId must not be null");
         if (!states.containsKey(nodeId)) {
-            return BOOTSTRAP_PHI_VALUE;
+            return bootstrapPhiValue;
         }
         History nodeState = states.get(nodeId);
         synchronized (nodeState) {
             long latestHeartbeat = nodeState.latestHeartbeatTime();
             DescriptiveStatistics samples = nodeState.samples();
-            if (latestHeartbeat == -1 || samples.getN() < MIN_SAMPLES) {
+            if (latestHeartbeat == -1 || samples.getN() < minSamples) {
                 return 0.0;
             }
             return computePhi(samples, latestHeartbeat, System.currentTimeMillis());
@@ -95,13 +102,27 @@
         long size = samples.getN();
         long t = tNow - tLast;
         return (size > 0)
-               ? PHI_FACTOR * t / samples.getMean()
-               : BOOTSTRAP_PHI_VALUE;
+                ? phiFactor * t / samples.getMean()
+                : bootstrapPhiValue;
     }
 
+
+    private void setMinSamples(int samples) {
+        minSamples = samples;
+    }
+
+    private void setPhiFactor(double factor) {
+        phiFactor = factor;
+    }
+
+    private void setBootstrapPhiValue(double phiValue) {
+        bootstrapPhiValue = phiValue;
+    }
+
+
     private static class History {
         DescriptiveStatistics samples =
-                new DescriptiveStatistics(WINDOW_SIZE);
+                new DescriptiveStatistics(DEFAULT_WINDOW_SIZE);
         long lastHeartbeatTime = -1;
 
         public DescriptiveStatistics samples() {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/DistributedClusterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/DistributedClusterStoreTest.java
new file mode 100644
index 0000000..6efbae4
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/impl/DistributedClusterStoreTest.java
@@ -0,0 +1,24 @@
+package org.onosproject.store.cluster.impl;
+
+import org.junit.After;
+import org.junit.Before;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for DistributedClusterStore.
+ */
+public class DistributedClusterStoreTest {
+    DistributedClusterStore distributedClusterStore;
+
+    @Before
+    public void setUp() throws Exception {
+        distributedClusterStore = new DistributedClusterStore();
+        distributedClusterStore.activate();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        distributedClusterStore.deactivate();
+    }
+}
\ No newline at end of file