Change sequential stream to parallel stream for link discovery

If some devices are late to response result in to link discovery, remained switches's information will be pended to update.
So, to reduce the execution time and to prevent delay of update information, change sequential stream to parallel stream.

Change-Id: I4fb8dfd66b7e784d160994f87ffeee47b1a497aa
diff --git a/providers/link/src/main/java/org/onosproject/provider/linkdiscovery/impl/LinkDiscoveryProvider.java b/providers/link/src/main/java/org/onosproject/provider/linkdiscovery/impl/LinkDiscoveryProvider.java
index dba7add..7f29cc5 100644
--- a/providers/link/src/main/java/org/onosproject/provider/linkdiscovery/impl/LinkDiscoveryProvider.java
+++ b/providers/link/src/main/java/org/onosproject/provider/linkdiscovery/impl/LinkDiscoveryProvider.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.provider.linkdiscovery.impl;
 
+import com.google.common.collect.Sets;
+import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -48,9 +50,16 @@
 
 import java.util.Dictionary;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
@@ -69,6 +78,7 @@
         property = {
                 POLL_DELAY_SECONDS + ":Integer=" + POLL_DELAY_SECONDS_DEFAULT,
                 POLL_FREQUENCY_SECONDS + ":Integer=" + POLL_FREQUENCY_SECONDS_DEFAULT,
+                LINK_DISCOVERY_TIMEOUT_SECONDS + ":Integer=" + POLL_DISCOVERY_TIMEOUT_DEFAULT,
         })
 public class LinkDiscoveryProvider extends AbstractProvider
         implements LinkProvider {
@@ -84,6 +94,10 @@
     /** Frequency (in seconds) for polling link discovery. */
     protected static int linkPollFrequencySeconds = POLL_FREQUENCY_SECONDS_DEFAULT;
 
+    /** Discovery timeout (in seconds) for polling arp discovery. */
+    protected static int linkDiscoveryTimeoutSeconds = POLL_DISCOVERY_TIMEOUT_DEFAULT;
+
+    private static final int POOL_SIZE = 10;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected LinkProviderRegistry providerRegistry;
@@ -95,6 +109,9 @@
     protected MastershipService mastershipService;
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
+    protected ExecutorService linkDiscoveryExecutor =
+            Executors.newFixedThreadPool(POOL_SIZE, groupedThreads("onos/linkdiscoveryprovider",
+                                                                   "link-collector-%d", log));
     protected ScheduledExecutorService executor =
             newScheduledThreadPool(2, groupedThreads("onos/netconf-link",
                                                      "discovery-%d"));
@@ -105,6 +122,7 @@
     private InternalDeviceListener deviceListener = new InternalDeviceListener();
     private ApplicationId appId;
     private ScheduledFuture<?> scheduledTask;
+    private ForkJoinPool scheduledTaskPool = new ForkJoinPool(POOL_SIZE);
 
     /**
      * Creates a provider with the supplied identifier.
@@ -153,10 +171,13 @@
 
             int newPollFrequency = getNewPollFrequency(properties, linkPollFrequencySeconds);
             int newPollDelay = getNewPollDealy(properties, linkPollDelaySeconds);
+            int newDiscoveryTimeout = getNewDiscoveryTimeout(properties, linkDiscoveryTimeoutSeconds);
             if (newPollFrequency != linkPollFrequencySeconds ||
-                    newPollDelay != linkPollDelaySeconds) {
+                    newPollDelay != linkPollDelaySeconds ||
+                    newDiscoveryTimeout != linkDiscoveryTimeoutSeconds) {
                 linkPollFrequencySeconds = newPollFrequency;
                 linkPollDelaySeconds = newPollDelay;
+                linkDiscoveryTimeoutSeconds = newDiscoveryTimeout;
                 //stops the old scheduled task
                 scheduledTask.cancel(true);
                 //schedules new task at the new polling rate
@@ -188,19 +209,63 @@
         return newPollFrequency;
     }
 
+    private int getNewDiscoveryTimeout(Dictionary<?, ?> properties, int discoveryTimeout) {
+        int newDiscoveryTimeout;
+        try {
+            String s = get(properties, LINK_DISCOVERY_TIMEOUT_SECONDS);
+            newDiscoveryTimeout = isNullOrEmpty(s) ? discoveryTimeout : Integer.parseInt(s.trim());
+        } catch (NumberFormatException | ClassCastException e) {
+            newDiscoveryTimeout = POLL_DISCOVERY_TIMEOUT_DEFAULT;
+            log.error("Cannot update Discovery Timeout", e);
+        }
+        return newDiscoveryTimeout;
+    }
+
     private ScheduledFuture schedulePolling() {
+        log.info("schedule: discoverLinksTasks with {} sec, {} sec",
+                linkPollDelaySeconds, linkPollFrequencySeconds);
         return executor.scheduleAtFixedRate(this::discoverLinksTasks,
                                             linkPollDelaySeconds,
                                             linkPollFrequencySeconds,
                                             SECONDS);
     }
 
+    private void discoverLinks(Device device) {
+        DeviceId deviceId = device.id();
+        Set<LinkDescription> response = null;
+        try {
+            response = CompletableFuture.supplyAsync(() -> device.as(LinkDiscovery.class).getLinks(),
+                    linkDiscoveryExecutor)
+                    .exceptionally(e -> {
+                        log.error("Exception is occurred during update the links. Device id {} {}", deviceId, e);
+                        return null;
+                    })
+                    .get(linkDiscoveryTimeoutSeconds, SECONDS);
+        } catch (TimeoutException e) {
+            log.error("Timout is occurred during update the links. Device id {}, Timeout {}",
+                    deviceId, linkDiscoveryTimeoutSeconds);
+        } catch (InterruptedException | ExecutionException e) {
+            log.warn("Exception is occurred during update the links. Device id {}, Timeout {}",
+                    deviceId, linkDiscoveryTimeoutSeconds);
+        }
+        if (Objects.isNull(response)) {
+            return;
+        }
+        evaluateLinks(deviceId, response);
+    }
+
     private void discoverLinksTasks() {
-        deviceService.getAvailableDevices().forEach(device -> {
-            if (isSupported(device)) {
-                evaluateLinks(device.id(), device.as(LinkDiscovery.class).getLinks());
-            }
-        });
+        try {
+            scheduledTaskPool.submit(exceptionSafe(() -> {
+                Tools.stream(deviceService.getAvailableDevices()).parallel().forEach(device -> exceptionSafe(() -> {
+                    if (isSupported(device)) {
+                        discoverLinks(device);
+                    }
+                }).run());
+            })).get();
+        } catch (Exception e) {
+            log.info("Unhandled exception {}", e.getMessage());
+        }
     }
 
     private void evaluateLinks(DeviceId deviceId, Set<LinkDescription> discoveredLinksDesc) {
@@ -213,10 +278,7 @@
                 .stream()
                 .filter(link -> {
                     String value = link.annotations().value(AnnotationKeys.PROTOCOL);
-                    if (value != null && value.equals(SCHEME_NAME.toUpperCase())) {
-                        return true;
-                    }
-                    return false;
+                    return Objects.equals(value, SCHEME_NAME.toUpperCase());
                 })
                 .collect(Collectors.toSet());
 
@@ -266,16 +328,17 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            if ((event.type() == DeviceEvent.Type.DEVICE_ADDED)) {
-                executor.execute(() -> event.subject().as(LinkDiscovery.class).getLinks()
-                        .forEach(linkDesc -> {
-                            providerService.linkDetected(new DefaultLinkDescription(
-                                    linkDesc.src(), linkDesc.dst(), linkDesc.type(), false,
-                                    DefaultAnnotations.builder()
-                                            .putAll(linkDesc.annotations())
-                                            .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase())
-                                            .build()));
-                        }));
+            Device device = event.subject();
+            switch (event.type()) {
+                case DEVICE_ADDED:
+                    executor.execute(() -> discoverLinks(device));
+                    break;
+                case DEVICE_REMOVED:
+                    evaluateLinks(device.id(), Sets.newHashSet());
+                    break;
+                default:
+                    log.debug("No implemented action for other DeviceEvents for the device {}", device.id());
+                    break;
             }
         }
 
@@ -284,4 +347,14 @@
             return isSupported(event.subject());
         }
     }
+
+    private Runnable exceptionSafe(Runnable runnable) {
+        return () -> {
+            try {
+                runnable.run();
+            } catch (Exception e) {
+                log.error("Unhandled Exception", e);
+            }
+        };
+    }
 }
diff --git a/providers/link/src/main/java/org/onosproject/provider/linkdiscovery/impl/OsgiPropertyConstants.java b/providers/link/src/main/java/org/onosproject/provider/linkdiscovery/impl/OsgiPropertyConstants.java
index 4335fac..1ead70a 100644
--- a/providers/link/src/main/java/org/onosproject/provider/linkdiscovery/impl/OsgiPropertyConstants.java
+++ b/providers/link/src/main/java/org/onosproject/provider/linkdiscovery/impl/OsgiPropertyConstants.java
@@ -29,4 +29,7 @@
     public static final String POLL_FREQUENCY_SECONDS = "linkPollFrequencySeconds";
     public static final int POLL_FREQUENCY_SECONDS_DEFAULT = 10;
 
+    public static final String LINK_DISCOVERY_TIMEOUT_SECONDS = "linkDiscoveryTimeoutSeconds";
+    public static final int POLL_DISCOVERY_TIMEOUT_DEFAULT = 300;
+
 }
diff --git a/providers/link/src/test/java/org/onosproject/provider/linkdiscovery/impl/LinkDiscoveryProviderTest.java b/providers/link/src/test/java/org/onosproject/provider/linkdiscovery/impl/LinkDiscoveryProviderTest.java
index c054a11..105a9ab 100644
--- a/providers/link/src/test/java/org/onosproject/provider/linkdiscovery/impl/LinkDiscoveryProviderTest.java
+++ b/providers/link/src/test/java/org/onosproject/provider/linkdiscovery/impl/LinkDiscoveryProviderTest.java
@@ -77,6 +77,7 @@
                     Hashtable<String, Integer> props = new Hashtable<>();
                     props.put("linkPollFrequencySeconds", 2);
                     props.put("linkPollDelaySeconds", 1);
+                    props.put("linkDiscoveryTimeoutSeconds", 1);
                     return props;
                 }
             };
@@ -160,6 +161,8 @@
                      provider.linkPollFrequencySeconds);
         assertEquals("Incorrect polling delay , should be default", 20,
                      provider.linkPollDelaySeconds);
+        assertEquals("Incorrect polling discovery delay , should be default", 300,
+                     provider.linkDiscoveryTimeoutSeconds);
     }
 
     @Test
@@ -169,6 +172,8 @@
                      provider.linkPollFrequencySeconds);
         assertEquals("Incorrect polling delay , should be default", 1,
                      provider.linkPollDelaySeconds);
+        assertEquals("Incorrect polling discovery delay , should be default", 1,
+                     provider.linkDiscoveryTimeoutSeconds);
 
     }
 
@@ -223,6 +228,11 @@
         }
 
         @Override
+        public Device getDevice(DeviceId deviceId) {
+            return device1;
+        }
+
+        @Override
         public void addListener(DeviceListener listener) {
             deviceListeners.add(listener);
         }
@@ -302,4 +312,4 @@
         }
 
     }
-}
\ No newline at end of file
+}