Adding features to Node Diagnosis Application

Node Diagnosis application will diagnose the Controller node failures
and tries to recover it

Adding a feature to analyse memory, socket descriptors, threads and file descriptors usage
and identifying abnormal behaviour and recovering the controller node.

Change-Id: Ib30022ee87f94b86861e3e3d961dd2469844b25c
diff --git a/apps/node-diagnosis/BUILD b/apps/node-diagnosis/BUILD
index 6bc723e..fea09c3 100644
--- a/apps/node-diagnosis/BUILD
+++ b/apps/node-diagnosis/BUILD
@@ -1,5 +1,7 @@
 COMPILE_DEPS = CORE_DEPS + CLI + [
     "@org_apache_karaf_bundle_core//jar",
+    "@sigar//jar",
+    "@jackson_databind//jar",
 ]
 
 osgi_jar_with_tests(
diff --git a/apps/node-diagnosis/src/main/java/org/onosproject/diagnosis/impl/NodeDiagnosisManager.java b/apps/node-diagnosis/src/main/java/org/onosproject/diagnosis/impl/NodeDiagnosisManager.java
index 1519e24..cee96c3 100644
--- a/apps/node-diagnosis/src/main/java/org/onosproject/diagnosis/impl/NodeDiagnosisManager.java
+++ b/apps/node-diagnosis/src/main/java/org/onosproject/diagnosis/impl/NodeDiagnosisManager.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.diagnosis.impl;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Modified;
@@ -44,37 +46,60 @@
 import org.slf4j.Logger;
 
 import java.util.Dictionary;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
+import java.io.File;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.Collections;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.hyperic.sigar.Sigar;
+import org.hyperic.sigar.ProcFd;
+import java.util.concurrent.ExecutorService;
 
-import static com.google.common.collect.Lists.newArrayList;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onlab.util.Tools.isPropertyEnabled;
 import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.INITIAL_POLL_DELAY_MINUTE;
 import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.POLL_FREQUENCY_MINUTE;
 import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.DEFAULT_INITIAL_POLL_DELAY_MINUTE;
 import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.DEFAULT_POLL_FREQUENCY_MINUTE;
+import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.REBOOT_RETRY_COUNT;
+import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.DEFAULT_REBOOT_RETRY_COUNT;
+import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.INITIAL_CLUSTER_TIMEOUT_PERIOD;
+import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.DEFAULT_CLUSTER_TIMEOUT_PERIOD;
+import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.INITIAL_DIAGNOSIS_ACTION;
+import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.DEFAULT_DIAGNOSIS_ACTION;
 
 
 @Component(immediate = true,
         property = {
                 INITIAL_POLL_DELAY_MINUTE + ":Integer=" + DEFAULT_INITIAL_POLL_DELAY_MINUTE,
                 POLL_FREQUENCY_MINUTE + ":Integer=" + DEFAULT_POLL_FREQUENCY_MINUTE,
+                REBOOT_RETRY_COUNT + ":Integer=" + DEFAULT_REBOOT_RETRY_COUNT,
+                INITIAL_CLUSTER_TIMEOUT_PERIOD + ":Integer=" + DEFAULT_CLUSTER_TIMEOUT_PERIOD,
+                INITIAL_DIAGNOSIS_ACTION + ":Boolean=" + DEFAULT_DIAGNOSIS_ACTION,
         })
 public class NodeDiagnosisManager {
 
@@ -82,6 +107,12 @@
 
     private static int initialPollDelayMinute = DEFAULT_INITIAL_POLL_DELAY_MINUTE;
     private static int pollFrequencyMinute = DEFAULT_POLL_FREQUENCY_MINUTE;
+    private static final File CFG_FILE = new File("../config/diag-info.json");
+    private static final String REBOOT_NU = "rebootNu";
+    private static int initialClusterTimeoutPeriod = DEFAULT_CLUSTER_TIMEOUT_PERIOD;
+    private static boolean initialDiagnosisAction = true;
+    private static int rebootRetryCount = DEFAULT_REBOOT_RETRY_COUNT;
+    private int rebootNu;
 
     private final Logger log = getLogger(getClass());
     private ScheduledExecutorService metricsExecutor;
@@ -106,7 +137,22 @@
     private BundleContext bundleContext;
     private ClusterAdminService caService;
     private NodeId localNodeId;
-    private Set<NodeId> nodeIds;
+    private Sigar sigar;
+    private static Process getTcpProc;
+    private static Process getUdpProc;
+    private static final String[] CMD_FOR_NETSTAT_PID = {"/bin/sh", "-c",
+            "ps -ef | grep netstat | grep -v grep | cut -c10-15 | tr -d \' \'"};
+    private static final String[] CMD_FOR_PID = {"/bin/sh", "-c",
+            "ps -ef | grep org.apache.karaf.main.Main | grep -v grep | cut -c10-15 | tr -d \' \'"};
+    private static final String[] CMD_FOR_TOTAL_MEMORY = {"/bin/sh", "-c",
+            "free -b | cut -d \' \' -f 5"};
+    private static long memoryThreshold;
+    private static final int FD_THRESHOLD = 50000;
+    private static final int SOCKETS_THRESHOLD = 50000;
+    private static final int DATA_BLOCK_1024 = 1024;
+    private static final int MEMORY_START_IDX = 3;
+    private static final String EXTRA_SPACE = "\\s+";
+    private static long pid;
 
     private static final long TIMEOUT = 3000;
 
@@ -115,10 +161,17 @@
         cfgService.registerProperties(getClass());
         bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
         getNodeId();
+        sigar = new Sigar();
         scheduleAppDiagnosisPolling();
         scheduleClusterNodeDiagnosisPolling();
+        getPid();
+        getMemoryThreshold();
+        scheduleSdncMemoryDiagnosisPolling();
+        scheduleSdncFileDescriptorDiagnosisPolling();
         communicationService.addSubscriber(REBOOT_MSG, new InternalSampleCollector(),
-                                           getPoolThreadExecutor());
+                getPoolThreadExecutor());
+        rebootNu = fetchRebootNu(); //to restrict number of reboots , reboot numbers will be saved in file and used
+        rebootRetryCount = fetchRetryRebootCount(); // to set maximum limit for reboot retry.
         log.info("Started");
     }
 
@@ -159,17 +212,21 @@
             changed = true;
         }
         log.info("Node Diagnosis properties are:" +
-                         " initialPollDelayMinute: {}, pollFrequencyMinute: {}",
-                 initialPollDelayMinute, pollFrequencyMinute);
+                        " initialPollDelayMinute: {}, pollFrequencyMinute: {}",
+                initialPollDelayMinute, pollFrequencyMinute);
         if (changed) {
             //stops the old scheduled task
             this.clusterNodeDiagnosisFuture.cancel(true);
             //schedules new task at the new polling rate
             log.info("New Scheduler started with,Node Diagnosis properties:" +
-                             " initialPollDelayMinute: {}, pollFrequencyMinute: {}",
-                     initialPollDelayMinute, pollFrequencyMinute);
+                            " initialPollDelayMinute: {}, pollFrequencyMinute: {}",
+                    initialPollDelayMinute, pollFrequencyMinute);
             scheduleClusterNodeDiagnosisPolling();
         }
+
+        int newRebootRetryCount = getNewRebootRetryCount(properties);
+        updateDiagFile(rebootNu, newRebootRetryCount);
+        initialDiagnosisAction = getNewDiagnosisAction(properties);
     }
 
     private int getNewPollFrequency(Dictionary<?, ?> properties) {
@@ -192,6 +249,26 @@
         return newPollDelay;
     }
 
+    private int getNewRebootRetryCount(Dictionary<?, ?> properties) {
+        int newRebootRetryCount;
+        try {
+            newRebootRetryCount = getIntegerProperty(properties, REBOOT_RETRY_COUNT);
+        } catch (NumberFormatException | ClassCastException e) {
+            newRebootRetryCount = DEFAULT_REBOOT_RETRY_COUNT;
+        }
+        return newRebootRetryCount;
+    }
+
+    private boolean getNewDiagnosisAction(Dictionary<?, ?> properties) {
+        boolean newDiagnosisAction;
+        try {
+            newDiagnosisAction = isPropertyEnabled(properties, INITIAL_DIAGNOSIS_ACTION);
+        } catch (NumberFormatException | ClassCastException e) {
+            newDiagnosisAction = DEFAULT_DIAGNOSIS_ACTION;
+        }
+        return newDiagnosisAction;
+    }
+
     private List<Bundle> getAllBundles() {
         return Arrays.asList(bundleContext.getBundles());
     }
@@ -201,11 +278,6 @@
         if (Objects.isNull(caService)) {
             return;
         }
-        List<ControllerNode> controllerNodes = newArrayList(caService.getNodes());
-        nodeIds = controllerNodes
-                .stream()
-                .map(ControllerNode::id)
-                .collect(Collectors.toSet());
 
         localNodeId = caService.getLocalNode().id();
     }
@@ -213,16 +285,29 @@
     private void scheduleAppDiagnosisPolling() {
         metricsExecutor = newSingleThreadScheduledExecutor(
                 groupedThreads("Nodediagnosis/diagnosisThread",
-                               "Nodediagnosis-executor-%d", log));
+                        "Nodediagnosis-executor-%d", log));
         metricsExecutor.scheduleAtFixedRate(this::appDiagnosis,
-                                            60,
-                                            30, TimeUnit.SECONDS);
+                60,
+                30, TimeUnit.SECONDS);
     }
 
     private void scheduleClusterNodeDiagnosisPolling() {
         clusterNodeDiagnosisFuture = metricsExecutor.scheduleAtFixedRate(this::clusterNodeDiagnosis,
-                                                                         initialPollDelayMinute,
-                                                                         pollFrequencyMinute, TimeUnit.MINUTES);
+                initialPollDelayMinute,
+                pollFrequencyMinute, TimeUnit.MINUTES);
+    }
+
+    private void scheduleSdncMemoryDiagnosisPolling() {
+        metricsExecutor.scheduleAtFixedRate(this::sdncMemoryDiagnosis,
+                60,
+                30, TimeUnit.SECONDS);
+
+    }
+
+    private void scheduleSdncFileDescriptorDiagnosisPolling() {
+        metricsExecutor.scheduleAtFixedRate(this::sdncFileDescriptorDiagnosis,
+                60,
+                30, TimeUnit.SECONDS);
     }
 
     private void appDiagnosis() {
@@ -270,9 +355,9 @@
             this.getAllBundles().forEach(
                     bundle -> {
                         log.debug("Bundle service - BundleName:{}, Diag:{}, number of lines of diag:{}",
-                                  bundleService.getInfo(bundle).getName(),
-                                  bundleService.getDiag(bundle),
-                                  bundleService.getDiag(bundle).split("[\n|\r]").length);
+                                bundleService.getInfo(bundle).getName(),
+                                bundleService.getDiag(bundle),
+                                bundleService.getDiag(bundle).split("[\n|\r]").length);
                     });
 
 
@@ -324,26 +409,341 @@
         return nonActiveComponents;
     }
 
+    private int fetchRebootNu() {
+        int rebootNum = 0;
+        if (!CFG_FILE.exists()) {
+            log.debug("CFG file not found for reboot number");
+            return rebootNum;
+        }
+
+        ObjectNode root;
+        try {
+            root = (ObjectNode) new ObjectMapper().readTree(CFG_FILE);
+            if (Objects.nonNull(root.findValue(REBOOT_NU))) {
+                rebootNum = root.findValue(REBOOT_NU).asInt();
+            }
+        } catch (IOException e) {
+            log.error("applyConfiguration: Exception occurred: {} for {}", e, CFG_FILE);
+        }
+        return rebootNum;
+    }
+
+    private int fetchRetryRebootCount() {
+        int rebootCount = rebootRetryCount;
+        if (!CFG_FILE.exists()) {
+            log.debug("CFG file not found for reboot number");
+            return rebootCount;
+        }
+
+        ObjectNode root;
+        try {
+            root = (ObjectNode) new ObjectMapper().readTree(CFG_FILE);
+            if (Objects.nonNull(root.findValue(REBOOT_RETRY_COUNT))) {
+                rebootCount = root.findValue(REBOOT_RETRY_COUNT).asInt();
+            }
+        } catch (IOException e) {
+            log.error("applyConfiguration: Exception occurred: {} for {}", e, CFG_FILE);
+        }
+        return rebootCount;
+    }
+
+    private void resetRebootNu() {
+        updateRebootNu(0);
+    }
+
+    private void updateRebootNu(int rebootnum) {
+        updateDiagFile(rebootnum, rebootRetryCount);
+    }
+
+    private void updateDiagFile(int rebootnum, int defaultRebootcount) {
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, Integer> output = new HashMap<>();
+        output.put(REBOOT_RETRY_COUNT, defaultRebootcount);
+        output.put(REBOOT_NU, rebootnum);
+        rebootNu = rebootnum;
+        rebootRetryCount = defaultRebootcount;
+        try {
+            mapper.writerWithDefaultPrettyPrinter().writeValue(CFG_FILE, output);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
     private void clusterNodeDiagnosis() {
         if (Objects.isNull(caService)) {
             return;
         }
-
-        List<ControllerNode> nodes = newArrayList(caService.getNodes());
-        Set<NodeId> activeNodes = nodes
-                .stream()
-                .filter(node -> caService.getState(node.id()) == ControllerNode.State.ACTIVE)
-                .filter(node -> caService.getLastUpdatedInstant(node.id()).until(Instant.now(), ChronoUnit.MINUTES) > 4)
-                .map(ControllerNode::id)
-                .collect(Collectors.toSet());
-        boolean isNodesActive = nodes
-                .stream().filter(node -> !(caService.getState(node.id()) == ControllerNode.State.INACTIVE))
-                .allMatch(node -> caService.getState(node.id()) == ControllerNode.State.ACTIVE);
-        if (Objects.nonNull(activeNodes) && !activeNodes.isEmpty()) {
-            multicastReboot(isNodesActive, activeNodes);
+        try {
+            if (caService.getState(localNodeId).equals(ControllerNode.State.READY)) {
+                if (rebootNu > 0) {
+                    resetRebootNu();
+                }
+                return;
+            }
+            long lastUpdatedInstant = caService.getLastUpdatedInstant(localNodeId).until(Instant.now(),
+                    ChronoUnit.MINUTES);
+            if (lastUpdatedInstant <= initialClusterTimeoutPeriod) {
+                return;
+            }
+            /**
+             * Diagnosis Action if set to true, onos reboot occurs when required.
+             * Diagnosis Action if set to false, leaves logs informing that onos reboot is needed.
+             */
+            if (!initialDiagnosisAction) {
+                log.info("onos Halt is needed as cluster node status is in: {} for Time out period: {}" +
+                                " for node {} with lastUpdatedInstant: {}" +
+                                " But, not onos is not rebooted as Diagnosis action is set to false",
+                        caService.getState(localNodeId), initialClusterTimeoutPeriod, localNodeId, lastUpdatedInstant);
+                return;
+            }
+            log.info("onos Halt is needed as cluster node status is in: {} for Time out period: {}" +
+                            " for node {} with lastUpdatedInstant: {}",
+                    caService.getState(localNodeId), initialClusterTimeoutPeriod, localNodeId, lastUpdatedInstant);
+            if (rebootNu < rebootRetryCount) {
+                updateRebootNu(rebootNu + 1);
+                log.info("Halting.Number of Halting:{}", rebootNu);
+                multicastReboot(true, Collections.singleton(localNodeId));
+            } else {
+                log.info("Halting is ignored as it is crossed limit of default Halting number");
+            }
+        } catch (Exception e) {
+            log.error("Exception occured in Cluster Node Diagnosis", e);
         }
     }
 
+    /**
+     * Gets memory threshold.
+     * Obtains total memory of the system where onos is running.
+     * 80% of the total memory is taken as threshold.
+     */
+    private void getMemoryThreshold() {
+        String memStr = "";
+        try {
+            String outputMem;
+            Process getMemProc = Runtime.getRuntime().exec(CMD_FOR_TOTAL_MEMORY);
+            try (BufferedReader bufferedReader = new BufferedReader(
+                    new InputStreamReader(getMemProc.getInputStream()))) {
+                while ((outputMem = bufferedReader.readLine()) != null) {
+                    memStr += outputMem;
+                }
+            }
+
+            memStr = memStr.replaceAll("\n", "");
+            long totalMem = Long.parseLong(memStr);
+            //Taking 80% of total memory as threshold
+            memoryThreshold = (long) (totalMem * 0.80);
+            log.trace("totalMemory {}", memoryThreshold);
+        } catch (Exception e) {
+            log.error("Exception occured while getting Pid", e);
+        }
+    }
+
+    /**
+     * Gets pid of the onos service.
+     */
+    private void getPid() {
+        String pidStr = "";
+        try {
+            String outputPid;
+            Process getPidProc = Runtime.getRuntime().exec(CMD_FOR_PID);
+            try (BufferedReader bufferedReader = new BufferedReader(
+                    new InputStreamReader(getPidProc.getInputStream()))) {
+                while ((outputPid = bufferedReader.readLine()) != null) {
+                    pidStr += outputPid;
+                }
+            }
+
+            this.pid = Long.parseLong(pidStr);
+            log.trace("pid {}", pid);
+        } catch (Exception e) {
+            log.error("Exception occured while getting Pid", e);
+        }
+    }
+
+    /**
+     * Restart onos if sdnc memory exceeds memory threshold.
+     */
+    private void sdncMemoryDiagnosis() {
+        if (Objects.isNull(pid)) {
+            return;
+        }
+        if (Objects.isNull(memoryThreshold)) {
+            return;
+        }
+        try {
+            String[] getMemCmd = {
+                    "/bin/sh",
+                    "-c",
+                    " ps -p " + pid + " -o rss"
+            };
+
+            String outputMem;
+            String outputMemFinal = "";
+            Process getMemProc = Runtime.getRuntime().exec(getMemCmd);
+            try (BufferedReader bufferedReader = new BufferedReader(
+                    new InputStreamReader(getMemProc.getInputStream()))) {
+                while ((outputMem = bufferedReader.readLine()) != null) {
+                    outputMemFinal += outputMem;
+                }
+            }
+            //Ex:outputFinalMem-> "     RSS1031496"
+            outputMemFinal = outputMemFinal.replaceAll(EXTRA_SPACE, "");
+            String memTotalStr = outputMemFinal.substring(MEMORY_START_IDX);
+            if (memTotalStr.isEmpty()) {
+                log.error("Total Memory is empty");
+                return;
+            }
+            long memTotal = Long.parseLong(memTotalStr);
+            memTotal *= DATA_BLOCK_1024;
+            log.trace("memTotal {}", memTotal);
+
+            if (memTotal > memoryThreshold) {
+                log.info("onos Halt is needed as memory has exceeded. " +
+                                "The threshold is {} and used memory is {} for node {}.",
+                        memoryThreshold, memTotal, localNodeId);
+                multicastReboot(true, Collections.singleton(localNodeId));
+            }
+        } catch (Exception e) {
+            log.error("exception at Sdnc Memory Diagnosis", e);
+        }
+
+    }
+
+    /**
+     * To obtain number of tcp socket descriptors.
+     */
+    private static class CallableTcpexecute implements Callable<Long> {
+        public Long call() throws Exception {
+            String[] cmdTcpFd = {"/bin/sh", "-c",
+                    "netstat -anp 2>/dev/null | grep " + pid + "/java | grep tcp | wc -l"};
+            getTcpProc = Runtime.getRuntime().exec(cmdTcpFd);
+            if (Objects.isNull(getTcpProc)) {
+                return 0L;
+            }
+            String outputTcp;
+            try (BufferedReader bufferedReader = new BufferedReader(
+                    new InputStreamReader(getTcpProc.getInputStream()))) {
+                outputTcp = bufferedReader.readLine();
+            }
+            if (Objects.isNull(outputTcp)) {
+                return 0L;
+            }
+            return Long.parseLong(outputTcp);
+        }
+    }
+
+    /**
+     * To obtain number of udp socket descriptors.
+     */
+    private static class CallableUdpexecute implements Callable<Long> {
+        public Long call() throws Exception {
+            String[] cmdUdpFd = {"/bin/sh", "-c",
+                    "netstat -anp 2>/dev/null | grep " + pid + "/java | grep udp | wc -l"};
+            getUdpProc = Runtime.getRuntime().exec(cmdUdpFd);
+            if (Objects.isNull(getUdpProc)) {
+                return 0L;
+            }
+            String outputUdp;
+            try (BufferedReader bufferedReader = new BufferedReader(
+                    new InputStreamReader(getUdpProc.getInputStream()))) {
+                outputUdp = bufferedReader.readLine();
+            }
+            if (Objects.isNull(outputUdp)) {
+                return 0L;
+            }
+            return Long.parseLong(outputUdp);
+        }
+    }
+
+    /**
+     * Restarts onos if total number of socket descriptors exceeds threshold.
+     */
+    private void socketDescriptorsDiagnosis() {
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        Future<Long> futureTcp;
+        Future<Long> futureUdp;
+        futureTcp = executorService.submit(new CallableTcpexecute());
+        futureUdp = executorService.submit(new CallableUdpexecute());
+        try {
+            long tcpSds = futureTcp.get(5, TimeUnit.SECONDS);
+            long udpSds = futureUdp.get(5, TimeUnit.SECONDS);
+
+            long totalSockets = tcpSds + udpSds;
+            log.trace("total {}, tcp {}, udp {}", totalSockets, tcpSds, udpSds);
+            if (totalSockets > SOCKETS_THRESHOLD) {
+                log.info("onos Halt is needed as socket descriptors has exceeded " +
+                        "threshold limit for node {}", localNodeId);
+                multicastReboot(true, Collections.singleton(localNodeId));
+            }
+        } catch (TimeoutException e) {
+            log.error("Timeout exception at Socket Descriptors diagnosis", e);
+            try {
+                if (Objects.nonNull(getTcpProc)) {
+                    getTcpProc.destroy();
+                }
+                if (Objects.nonNull(getUdpProc)) {
+                    getUdpProc.destroy();
+                }
+            } catch (Exception ex) {
+                log.error("Exception at destroying Tcp/Udp process", ex);
+            }
+
+            String outputPid;
+            try {
+                String pidStr = "";
+                Process getPidProc = Runtime.getRuntime().exec(CMD_FOR_NETSTAT_PID);
+                try (BufferedReader bufferedReader = new BufferedReader(
+                        new InputStreamReader(getPidProc.getInputStream()))) {
+                    while ((outputPid = bufferedReader.readLine()) != null) {
+                        pidStr += outputPid;
+                    }
+                }
+                if (!pidStr.equals("")) {
+                    Runtime.getRuntime().exec("kill " + pidStr);
+                }
+            } catch (Exception ex) {
+                log.error("Exception at killing netstat command", ex);
+            }
+
+            log.info("onos Halt is needed as timeout occured while finding total number of " +
+                    "socket descriptors for node {}", localNodeId);
+            multicastReboot(true, Collections.singleton(localNodeId));
+        } catch (Exception e) {
+            log.error("exception at Socket Descriptors diagnosis", e);
+        } finally {
+            futureTcp.cancel(true);
+            futureUdp.cancel(true);
+        }
+    }
+
+    /**
+     * Restarts onos if total number of threads and file descriptors exceeds threshold.
+     */
+    private void threadsAndFilesDescriptorDiagnosis() {
+        if (Objects.isNull(pid)) {
+            return;
+        }
+        try {
+            ProcFd procFd = sigar.getProcFd(pid);
+            long totalFd = procFd.getTotal();
+            log.trace("total fds{}", totalFd);
+            if (totalFd > FD_THRESHOLD) {
+                log.info("onos halt is needed as number of threads and file descriptors " +
+                        "has exceeded Threshold limit for node {}", localNodeId);
+                multicastReboot(true, Collections.singleton(localNodeId));
+            }
+        } catch (Exception e) {
+            log.error("Exception at Sdnc file descriptor diagnosis", e);
+
+        }
+
+    }
+
+
+    private void sdncFileDescriptorDiagnosis() {
+        socketDescriptorsDiagnosis();
+        threadsAndFilesDescriptorDiagnosis();
+    }
 
     public void restartNode() {
         try {
@@ -363,7 +763,7 @@
         public void handle(ClusterMessage message) {
             String reqMsg = new String(message.payload());
             log.info("Cluster communication message subject{} and message {}",
-                     message.subject(), reqMsg);
+                    message.subject(), reqMsg);
             boolean flag = Boolean.parseBoolean(reqMsg.split(":")[1].trim());
             if (flag) {
                 System.setProperty("apache.karaf.removedb", "true");
diff --git a/apps/node-diagnosis/src/main/java/org/onosproject/diagnosis/impl/OsgiPropertyConstants.java b/apps/node-diagnosis/src/main/java/org/onosproject/diagnosis/impl/OsgiPropertyConstants.java
index 9fcdb5d..b959c2e 100644
--- a/apps/node-diagnosis/src/main/java/org/onosproject/diagnosis/impl/OsgiPropertyConstants.java
+++ b/apps/node-diagnosis/src/main/java/org/onosproject/diagnosis/impl/OsgiPropertyConstants.java
@@ -26,4 +26,13 @@
     static final String POLL_FREQUENCY_MINUTE = "pollFrequencyMinute";
     static final int DEFAULT_POLL_FREQUENCY_MINUTE = 1;
 
+    static final String REBOOT_RETRY_COUNT = "rebootRetryCount";
+    static final int DEFAULT_REBOOT_RETRY_COUNT = 10;
+
+    static final String INITIAL_CLUSTER_TIMEOUT_PERIOD = "initialClusterTimeoutPeriod";
+    static final int DEFAULT_CLUSTER_TIMEOUT_PERIOD = 4;
+
+    static final String INITIAL_DIAGNOSIS_ACTION = "initialDiagnosisAction";
+    static final boolean DEFAULT_DIAGNOSIS_ACTION = true;
+
 }