File read properly updates topology information

Change-Id: I1e78e06e701cef45e5454d6e928967187174f8e5
diff --git a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
index ad5a03f..37cbfca 100644
--- a/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
+++ b/providers/null/link/src/main/java/org/onosproject/provider/nil/link/impl/NullLinkProvider.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.provider.nil.link.impl;
 
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -49,7 +50,6 @@
 import java.util.Dictionary;
 import java.util.Iterator;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
@@ -87,6 +87,7 @@
     private static final int CHECK_DURATION = 10;
     private static final int DEFAULT_RATE = 0; // usec
     private static final int REFRESH_RATE = 3; // sec
+    private static final DeviceId DEFAULT = DeviceId.deviceId("null:ffffffffffffffff");
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
@@ -109,10 +110,10 @@
             .newConcurrentMap();
 
     // Link descriptions
-    private final Set<LinkDescription> linkDescrs = Sets.newConcurrentHashSet();
+    private final List<LinkDescription> linkDescrs = Lists.newArrayList();
 
     // Thread to description map
-    private final List<Set<LinkDescription>> linkTasks = new ArrayList<>(THREADS);
+    private final List<List<LinkDescription>> linkTasks = Lists.newArrayList();
 
     private ScheduledExecutorService linkDriver =
             Executors.newScheduledThreadPool(THREADS, groupedThreads("onos/null", "link-driver-%d"));
@@ -140,10 +141,9 @@
         modified(context);
 
         if (flicker) {
-            allocateLinks();
             for (int i = 0; i < linkTasks.size(); i++) {
-                Set<LinkDescription> links = linkTasks.get(i);
-                LinkDriver driver = new LinkDriver(links, i);
+                List<LinkDescription> links = linkTasks.get(i);
+                LinkDriver driver = new LinkDriver(links);
                 links.forEach(v -> {
                     DeviceId d = v.src().deviceId();
                     Set<LinkDriver> s = driverMap.getOrDefault(d, Sets.newConcurrentHashSet());
@@ -157,7 +157,9 @@
                 }
             }
         } else {
-            linkDriver.schedule(new LinkDriver(linkDescrs, 0), 3, TimeUnit.SECONDS);
+            LinkDriver driver = new LinkDriver(linkDescrs);
+            driverMap.put(DEFAULT, Sets.newHashSet(driver));
+            linkDriver.schedule(driver, 3, TimeUnit.SECONDS);
         }
         log.info("started");
     }
@@ -208,8 +210,12 @@
         if (eventRate != newRate && newRate > 0) {
             eventRate = newRate;
             flicker = true;
+            allocateLinks();
         } else if (newRate == 0) {
             flicker = false;
+            // reconfigure driver - dumb but should work.
+            driverMap.getOrDefault(DEFAULT, Sets.newHashSet()).forEach(
+                    v -> v.setTasks(linkDescrs));
         }
 
         log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile);
@@ -218,6 +224,7 @@
     // parse simplified dot-like topology graph
     private void readGraph(String path, NodeId me) {
         log.info("path: {}, local: {}", path, me);
+        Set<LinkDescription> read = Sets.newHashSet();
         BufferedReader br = null;
         try {
             br = new BufferedReader(new FileReader(path));
@@ -239,7 +246,7 @@
                             if (cur.trim().contains("}")) {
                                 break;
                             }
-                            readLink(cur.trim().split(" "), me);
+                            readLink(cur.trim().split(" "), me, read);
                             cur = br.readLine();
                         }
                     } else {
@@ -264,10 +271,16 @@
                 log.warn("Could not close topology file: {}", e);
             }
         }
+        synchronized (linkDescrs) {
+            if (!read.isEmpty()) {
+                linkDescrs.clear();
+                linkDescrs.addAll(read);
+            }
+        }
     }
 
     // parses a link descriptor to make a LinkDescription
-    private void readLink(String[] linkArr, NodeId me) {
+    private void readLink(String[] linkArr, NodeId me, Set<LinkDescription> links) {
         if (linkArr[0].startsWith("#")) {
             return;
         }
@@ -302,11 +315,11 @@
         ConnectPoint dst = new ConnectPoint(ddev, PortNumber.portNumber(cp2[1]));
         // both link types have incoming half-link
         LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
-        linkDescrs.add(in);
+        links.add(in);
         if (op.equals("--")) {
             // bidirectional - within our node's island, make outbound link
             LinkDescription out = new DefaultLinkDescription(src, dst, DIRECT);
-            linkDescrs.add(out);
+            links.add(out);
             log.info("Created bidirectional link: {}, {}", out, in);
         } else if (op.equals("->")) {
             log.info("Created unidirectional link: {}", in);
@@ -336,7 +349,7 @@
             index = (lcount % THREADS);
             log.info("allocation: total={}, index={}", linkDescrs.size(), lcount, index);
             if (linkTasks.size() <= index) {
-                linkTasks.add(index, Sets.newHashSet(ld));
+                linkTasks.add(index, Lists.newArrayList(ld));
             } else {
                 linkTasks.get(index).add(ld);
             }
@@ -383,16 +396,13 @@
 
         long startTime;
 
-        LinkDriver(Set<LinkDescription> links, int index) {
-            for (LinkDescription link : links) {
-                tasks.add(link);
-            }
-            startTime = System.currentTimeMillis();
+        LinkDriver(List<LinkDescription> links) {
+            setTasks(links);
+            startTime = System.currentTimeMillis(); // yes, this will start off inaccurate
         }
 
         @Override
         public void run() {
-            log.info("Thread started for links {}", tasks);
             if (flicker) {
                 flicker();
             } else {
@@ -428,9 +438,9 @@
 
         private void refresh() {
             if (!linkDriver.isShutdown() || !tasks.isEmpty()) {
-                log.info("iter {} refresh_links for {} links", counter, linkDescrs.size());
+                log.info("iter {} refresh_links", counter);
 
-                for (LinkDescription desc : linkDescrs) {
+                for (LinkDescription desc : tasks) {
                     providerService.linkDetected(desc);
                     log.info("iteration {}, {}", counter, desc);
                 }
@@ -451,6 +461,21 @@
                 }
             }
         }
+
+        public void setTasks(List<LinkDescription> links) {
+            HashMultimap<ConnectPoint, ConnectPoint> nm = HashMultimap.create();
+            links.forEach(v -> nm.put(v.src(), v.dst()));
+            // remove and send linkVanished for stale links.
+            synchronized (this) {
+                for (LinkDescription l : tasks) {
+                    if (!nm.containsEntry(l.src(), l.dst())) {
+                        providerService.linkVanished(l);
+                    }
+                }
+                tasks.clear();
+                tasks.addAll(links);
+            }
+        }
     }
 
 }