Fix for ONOS-3183

Stream processes by Lambda queries in collection objects has a known performance bottleneck. For each create and update link data operations, this  kind of queries are frequently used and this causes inefficient topology discovery operation. For to solve that problem i analize thread dump during discovery process and i saw that especially getAllProviders method uses high cpu. than i change the provider search operation with additional map data which holds linkkey and its providers. by this way provider search operation not only gets faster but also uses minimum resource. Before that change, i can discover only 4XX number of switches at one controller, but later number grows to 15XX switches

Change-Id: I65ed71b7f295917c818b2f9227d0fc070aa4a29b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java
index 4f56f40..3dc048f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java
@@ -71,6 +71,7 @@
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -95,8 +96,8 @@
 @Component(immediate = true, enabled = true)
 @Service
 public class ECLinkStore
-    extends AbstractStore<LinkEvent, LinkStoreDelegate>
-    implements LinkStore {
+        extends AbstractStore<LinkEvent, LinkStoreDelegate>
+        implements LinkStore {
 
     /**
      * Modes for dealing with newly discovered links.
@@ -117,8 +118,10 @@
     private final Logger log = getLogger(getClass());
 
     private final Map<LinkKey, Link> links = Maps.newConcurrentMap();
+    private final Map<LinkKey, Set<ProviderId>> linkProviders = Maps.newConcurrentMap();
     private EventuallyConsistentMap<Provided<LinkKey>, LinkDescription> linkDescriptions;
 
+
     private ApplicationId appId;
 
     private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request");
@@ -188,10 +191,10 @@
                 }).build();
 
         clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE,
-                SERIALIZER::decode,
-                this::injectLink,
-                SERIALIZER::encode,
-                SharedExecutors.getPoolThreadExecutor());
+                                          SERIALIZER::decode,
+                                          this::injectLink,
+                                          SERIALIZER::encode,
+                                          SharedExecutors.getPoolThreadExecutor());
 
         linkDescriptions.addListener(linkTracker);
 
@@ -202,6 +205,7 @@
     public void deactivate() {
         linkDescriptions.removeListener(linkTracker);
         linkDescriptions.destroy();
+        linkProviders.clear();
         links.clear();
         clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE);
         netCfgService.removeListener(cfgListener);
@@ -273,10 +277,10 @@
                 return null;
             }
             return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId),
-                    LINK_INJECT_MESSAGE,
-                    SERIALIZER::encode,
-                    SERIALIZER::decode,
-                    dstNodeId));
+                                                                           LINK_INJECT_MESSAGE,
+                                                                           SERIALIZER::encode,
+                                                                           SERIALIZER::decode,
+                                                                           dstNodeId));
         }
     }
 
@@ -295,16 +299,24 @@
     private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) {
         if (current != null) {
             // we only allow transition from INDIRECT -> DIRECT
-            return  new DefaultLinkDescription(
-                        current.src(),
-                        current.dst(),
-                        current.type() == DIRECT ? DIRECT : updated.type(),
-                        current.isExpected(),
-                        union(current.annotations(), updated.annotations()));
+            return new DefaultLinkDescription(
+                    current.src(),
+                    current.dst(),
+                    current.type() == DIRECT ? DIRECT : updated.type(),
+                    current.isExpected(),
+                    union(current.annotations(), updated.annotations()));
         }
         return updated;
     }
 
+    private Set<ProviderId> createOrUpdateLinkProviders(Set<ProviderId> current, ProviderId providerId) {
+        if (current == null) {
+            current = Sets.newConcurrentHashSet();
+        }
+        current.add(providerId);
+        return current;
+    }
+
     private LinkEvent refreshLinkCache(LinkKey linkKey) {
         AtomicReference<LinkEvent.Type> eventType = new AtomicReference<>();
         Link link = links.compute(linkKey, (key, existingLink) -> {
@@ -313,11 +325,11 @@
                 eventType.set(LINK_ADDED);
                 return newLink;
             } else if (existingLink.state() != newLink.state() ||
-                       existingLink.isExpected() != newLink.isExpected() ||
-                        (existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
-                        !AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
-                    eventType.set(LINK_UPDATED);
-                    return newLink;
+                    existingLink.isExpected() != newLink.isExpected() ||
+                    (existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
+                    !AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
+                eventType.set(LINK_UPDATED);
+                return newLink;
             } else {
                 return existingLink;
             }
@@ -326,20 +338,16 @@
     }
 
     private Set<ProviderId> getAllProviders(LinkKey linkKey) {
-        return linkDescriptions.keySet()
-                               .stream()
-                               .filter(key -> key.key().equals(linkKey))
-                               .map(key -> key.providerId())
-                               .collect(Collectors.toSet());
+        return linkProviders.getOrDefault(linkKey, Sets.newConcurrentHashSet());
     }
 
     private ProviderId getBaseProviderId(LinkKey linkKey) {
         Set<ProviderId> allProviders = getAllProviders(linkKey);
         if (allProviders.size() > 0) {
             return allProviders.stream()
-                               .filter(p -> !p.isAncillary())
-                               .findFirst()
-                               .orElse(Iterables.getFirst(allProviders, null));
+                    .filter(p -> !p.isAncillary())
+                    .findFirst()
+                    .orElse(Iterables.getFirst(allProviders, null));
         }
         return null;
     }
@@ -356,11 +364,14 @@
         annotations.set(merge(annotations.get(), base.annotations()));
 
         getAllProviders(linkKey).stream()
-                                .map(p -> new Provided<>(linkKey, p))
-                                .forEach(key -> {
-                                    annotations.set(merge(annotations.get(),
-                                                          linkDescriptions.get(key).annotations()));
-        });
+                .map(p -> new Provided<>(linkKey, p))
+                .forEach(key -> {
+                    LinkDescription linkDescription = linkDescriptions.get(key);
+                    if (linkDescription != null) {
+                        annotations.set(merge(annotations.get(),
+                                              linkDescription.annotations()));
+                    }
+                });
 
         Link.State initialLinkState;
 
@@ -375,8 +386,6 @@
         }
 
 
-
-
         return DefaultLink.builder()
                 .providerId(baseProviderId)
                 .src(src)
@@ -394,8 +403,8 @@
         // Note: INDIRECT -> DIRECT transition only
         // so that BDDP discovered Link will not overwrite LDDP Link
         if (oldLink.state() != newLink.state() ||
-            (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
-            !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
+                (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
+                !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
 
             links.put(key, newLink);
             return new LinkEvent(LINK_UPDATED, newLink);
@@ -447,6 +456,7 @@
         Link removedLink = links.remove(linkKey);
         if (removedLink != null) {
             getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p)));
+            linkProviders.remove(linkKey);
             return new LinkEvent(LINK_REMOVED, removedLink);
         }
         return null;
@@ -475,9 +485,12 @@
         @Override
         public void event(EventuallyConsistentMapEvent<Provided<LinkKey>, LinkDescription> event) {
             if (event.type() == PUT) {
+                linkProviders.compute(event.key().key(), (k, v) ->
+                        createOrUpdateLinkProviders(v, event.key().providerId()));
                 notifyDelegate(refreshLinkCache(event.key().key()));
             } else if (event.type() == REMOVE) {
                 notifyDelegate(purgeLinkCache(event.key().key()));
+                linkProviders.remove(event.key().key());
             }
         }
     }
@@ -521,8 +534,8 @@
     // Configuration properties factory
     private final ConfigFactory factory =
             new ConfigFactory<ApplicationId, CoreConfig>(APP_SUBJECT_FACTORY,
-                                                        CoreConfig.class,
-                                                        "core") {
+                                                         CoreConfig.class,
+                                                         "core") {
                 @Override
                 public CoreConfig createConfig() {
                     return new CoreConfig();