[ONOS-8129] Bug fixed for duplicate events when netconf device goes down along with some fixes for NPE in NetconfControllerImpl

Change-Id: I4ea6100ce4c4e56ff8219cf846f107f49c50cd2d
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
index daf92fa..c7e8923 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
@@ -141,6 +141,19 @@
     }
 
     /**
+     * If master, will execute the call locally else will use
+     * clusterCommunicationManager to execute at master controller.
+     * Meant only for internal synchronization and not to be used by applications.
+     *
+     * @param deviceId deviceId of device
+     * @param <T> for handling reply of generic type
+     * @return true or false
+     */
+    default <T> boolean pingDevice(DeviceId deviceId) {
+        return false;
+    }
+
+    /**
      * Get a contoller node Id .
      *
      * @return controller node Id
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index 90f6d14..c32004c 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -558,6 +558,27 @@
         }
     }
 
+    @Override
+    public <T> boolean pingDevice(DeviceId deviceId) {
+        NetconfProxyMessage proxyMessage = new DefaultNetconfProxyMessage(
+                NetconfProxyMessage.SubjectType.GET_DEVICE_CAPABILITIES_SET, deviceId, null, localNodeId);
+        CompletableFuture<T> reply;
+        if (deviceService.getRole(deviceId).equals(MastershipRole.MASTER)) {
+            reply = handleProxyMessage(proxyMessage);
+        } else {
+            reply = relayMessageToMaster(proxyMessage);
+        }
+        try {
+            T deviceCapabilities = reply.get();
+            log.debug("Get device capabilities from device : {} -> {}", deviceId, deviceCapabilities);
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("Error while getting device capabilities for device : {}", deviceId);
+            log.error("Error details : ", e);
+            return false;
+        }
+        return true;
+    }
+
     public <T> CompletableFuture<T> relayMessageToMaster(NetconfProxyMessage proxyMessage) {
         DeviceId deviceId = proxyMessage.deviceId();
 
@@ -592,6 +613,7 @@
     }
 
     private <T> CompletableFuture<T> handleProxyMessage(NetconfProxyMessage proxyMessage) {
+        countDownLatch = new CountDownLatch(1);
         try {
             switch (proxyMessage.subjectType()) {
                 case GET_DEVICE_CAPABILITIES_SET:
@@ -639,7 +661,7 @@
             NetconfProxyMessage.SubjectType subjectType = proxyMessage.subjectType();
             NetconfSession secureTransportSession;
 
-            if (netconfDeviceMap.get(deviceId).isMasterSession()) {
+            if (netconfDeviceMap.get(deviceId) != null && netconfDeviceMap.get(deviceId).isMasterSession()) {
                 secureTransportSession = netconfDeviceMap.get(deviceId).getSession();
             } else {
                 throw new NetconfException("Ssh session not present");
@@ -709,7 +731,7 @@
             NetconfProxyMessage.SubjectType subjectType = proxyMessage.subjectType();
             NetconfSession secureTransportSession;
 
-            if (netconfDeviceMap.get(deviceId).isMasterSession()) {
+            if (netconfDeviceMap.get(deviceId) != null && netconfDeviceMap.get(deviceId).isMasterSession()) {
                 secureTransportSession = netconfDeviceMap.get(deviceId).getSession();
             } else {
                 throw new NetconfException("SSH session not present");
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index 9ff8002..2a0533f 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -321,6 +321,15 @@
     }
 
     @Override
+    public boolean isAvailable(DeviceId deviceId) {
+        boolean isReachable = isTcpConnectionAvailable(deviceId);
+        if (isReachable) {
+            return controller.pingDevice(deviceId);
+        }
+        return false;
+    }
+
+    @Override
     public boolean isReachable(DeviceId deviceId) {
         boolean sessionExists =
                 Optional.ofNullable(controller.getDevicesMap().get(deviceId))
@@ -332,33 +341,7 @@
 
         //FIXME this is a workaround util device state is shared
         // between controller instances.
-        Device device = deviceService.getDevice(deviceId);
-        String ip;
-        int port;
-        if (device != null) {
-            ip = device.annotations().value(IPADDRESS);
-            port = Integer.parseInt(device.annotations().value(PORT));
-        } else {
-            Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
-            ip = info.getLeft();
-            port = info.getMiddle();
-        }
-        // FIXME just opening TCP session probably is not the appropriate
-        // method to test reachability.
-        //test connection to device opening a socket to it.
-        log.debug("Testing reachability for {}:{}", ip, port);
-        Socket socket = new Socket();
-        try {
-            socket.connect(new InetSocketAddress(ip, port), 1000);
-            log.debug("rechability of {}, {}, {}", deviceId, socket.isConnected(), !socket.isClosed());
-            boolean isConnected = socket.isConnected() && !socket.isClosed();
-            socket.close();
-            return isConnected;
-        } catch (IOException e) {
-            log.info("Device {} is not reachable", deviceId);
-            log.debug("  error details", e);
-            return false;
-        }
+        return isTcpConnectionAvailable(deviceId);
     }
 
     @Override
@@ -409,6 +392,36 @@
         controller.disconnectDevice(deviceId, true);
     }
 
+    private boolean isTcpConnectionAvailable(DeviceId deviceId) {
+        Device device = deviceService.getDevice(deviceId);
+        String ip;
+        int port;
+        if (device != null) {
+            ip = device.annotations().value(IPADDRESS);
+            port = Integer.parseInt(device.annotations().value(PORT));
+        } else {
+            Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
+            ip = info.getLeft();
+            port = info.getMiddle();
+        }
+        // FIXME just opening TCP session probably is not the appropriate
+        // method to test reachability.
+        //test connection to device opening a socket to it.
+        log.debug("Testing reachability for {}:{}", ip, port);
+        Socket socket = new Socket();
+        try {
+            socket.connect(new InetSocketAddress(ip, port), 1000);
+            log.debug("rechability of {}, {}, {}", deviceId, socket.isConnected(), !socket.isClosed());
+            boolean isConnected = socket.isConnected() && !socket.isClosed();
+            socket.close();
+            return isConnected;
+        } catch (IOException e) {
+            log.info("Device {} is not reachable", deviceId);
+            log.debug("  error details", e);
+            return false;
+        }
+    }
+
     private ScheduledFuture schedulePolling() {
         return pollingExecutor.scheduleAtFixedRate(exceptionSafe(this::checkAndUpdateDevices),
                 pollFrequency / 10,