Netconf device provider to handle series of consecutive Mastership 
requests properly

Change-Id: I1b81faae97b364aaae86128e629adff13f103dab
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index 8281f4e..e54e90f 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -18,6 +18,7 @@
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Striped;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -71,6 +72,7 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -88,6 +90,8 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
 
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
@@ -177,6 +181,7 @@
     protected final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
     private ApplicationId appId;
     private boolean active;
+    private final Striped<Lock> deviceLocks = Striped.lock(30);
 
 
     @Activate
@@ -271,7 +276,8 @@
         if (active) {
             switch (newRole) {
                 case MASTER:
-                    initiateConnection(deviceId, newRole);
+                    withDeviceLock(
+                            () -> initiateConnection(deviceId, newRole), deviceId).run();
                     log.debug("Accepting mastership role change to {} for device {}", newRole, deviceId);
                     break;
                 case STANDBY:
@@ -327,9 +333,13 @@
         // method to test reachability.
         //test connection to device opening a socket to it.
         log.debug("Testing reachability for {}:{}", ip, port);
-        try (Socket socket = new Socket(ip, port)) {
+        Socket socket = new Socket();
+        try {
+            socket.connect(new InetSocketAddress(ip, port), 1000);
             log.debug("rechability of {}, {}, {}", deviceId, socket.isConnected(), !socket.isClosed());
-            return socket.isConnected() && !socket.isClosed();
+            boolean isConnected = socket.isConnected() && !socket.isClosed();
+            socket.close();
+            return isConnected;
         } catch (IOException e) {
             log.info("Device {} is not reachable", deviceId);
             log.debug("  error details", e);
@@ -426,6 +436,10 @@
             log.trace("{} not my scheme, skipping", deviceId);
             return;
         }
+        if (!isReachable(deviceId)) {
+            log.warn("Can't connect to device {}", deviceId);
+            return;
+        }
         DeviceDescription deviceDescription = createDeviceRepresentation(deviceId, config);
         log.debug("Connecting NETCONF device {}, on {}:{} with username {}",
                   deviceId, config.ip(), config.port(), config.username());
@@ -434,11 +448,6 @@
         if (deviceService.getDevice(deviceId) == null) {
             providerService.deviceConnected(deviceId, deviceDescription);
         }
-        try {
-            checkAndUpdateDevice(deviceId, deviceDescription, true);
-        } catch (Exception e) {
-            log.error("Unhandled exception checking {}", deviceId, e);
-        }
     }
 
     private void checkAndUpdateDevice(DeviceId deviceId, DeviceDescription deviceDescription, boolean newlyConnected) {
@@ -451,7 +460,7 @@
         if (!isReachable && deviceService.isAvailable(deviceId)) {
             providerService.deviceDisconnected(deviceId);
             return;
-        } else if (newlyConnected) {
+        } else if (newlyConnected &&  mastershipService.isLocalMaster(deviceId)) {
             updateDeviceDescription(deviceId, deviceDescription, device);
         }
         if (isReachable && deviceService.isAvailable(deviceId) &&
@@ -540,6 +549,7 @@
                 .set(IPADDRESS, ipAddress)
                 .set(PORT, String.valueOf(config.port()))
                 .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase())
+                .set(AnnotationKeys.PROVIDER_MARK_ONLINE, String.valueOf(true))
                 .build();
         return new DefaultDeviceDescription(
                 deviceId.uri(),
@@ -568,8 +578,13 @@
     private void initiateConnection(DeviceId deviceId, MastershipRole newRole) {
         try {
             if (isReachable(deviceId)) {
-                controller.connectDevice(deviceId);
-                providerService.receivedRoleReply(deviceId, newRole, MastershipRole.MASTER);
+                NetconfDevice device = controller.connectDevice(deviceId);
+                if (device != null) {
+                    providerService.receivedRoleReply(deviceId, newRole, MastershipRole.MASTER);
+                } else {
+                    providerService.receivedRoleReply(deviceId, newRole, MastershipRole.NONE);
+                }
+
             }
         } catch (Exception e) {
             if (deviceService.getDevice(deviceId) != null) {
@@ -614,6 +629,24 @@
         }
     }
 
+    private <U> U withDeviceLock(Supplier<U> task, DeviceId deviceId) {
+        final Lock lock = deviceLocks.get(deviceId);
+        lock.lock();
+        try {
+            return task.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private Runnable withDeviceLock(Runnable task, DeviceId deviceId) {
+        // Wrapper of withDeviceLock(Supplier, ...) for void tasks.
+        return () -> withDeviceLock(() -> {
+            task.run();
+            return null;
+        }, deviceId);
+    }
+
     /**
      * Listener for configuration events.
      */
@@ -645,12 +678,23 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            executor.execute(() -> discoverPorts(event.subject().id()));
+            DeviceId deviceId = event.subject().id();
+            if (event.type() == DeviceEvent.Type.DEVICE_ADDED && !deviceService.isAvailable(event.subject().id())) {
+                try {
+                    checkAndUpdateDevice(deviceId, null, true);
+                } catch (Exception e) {
+                    log.error("Unhandled exception checking {}", deviceId, e);
+                }
+            }
+            if (deviceService.isAvailable(event.subject().id())) {
+                executor.execute(() -> discoverPorts(event.subject().id()));
+            }
         }
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
-            if (event.type() != DeviceEvent.Type.DEVICE_ADDED) {
+            if (event.type() != DeviceEvent.Type.DEVICE_ADDED &&
+                    event.type() != DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED) {
                 return false;
             }
             if (mastershipService.getMasterFor(event.subject().id()) == null) {
diff --git a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
index 53e7838..39498f3 100644
--- a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
+++ b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
@@ -263,12 +263,8 @@
         assertNotNull(providerService);
         assertTrue("Event should be relevant", provider.cfgListener.isRelevant(deviceAddedEvent));
         available = true;
-        provider.cfgListener.event(deviceAddedEvent);
-
-        deviceAdded.await();
-        assertEquals("Device should be added", 1, deviceStore.getDeviceCount());
-        assertTrue("Device incorrectly added" + NETCONF_DEVICE_ID_STRING,
-                   devices.containsKey(DeviceId.deviceId(NETCONF_DEVICE_ID_STRING)));
+        assertFalse("Device should not be reachable" + NETCONF_DEVICE_ID_STRING,
+                provider.isReachable(DeviceId.deviceId(NETCONF_DEVICE_ID_STRING)));
         devices.clear();
     }
 
@@ -341,6 +337,11 @@
         DeviceListener listener = null;
 
         @Override
+        public boolean isAvailable(DeviceId deviceId) {
+            return true;
+        }
+
+        @Override
         public Device getDevice(DeviceId deviceId) {
             if (deviceId.toString().equals(NETCONF_DEVICE_ID_STRING)) {
                 return null;