distributed link fixes

Change-Id: Iefede001a76834599a5629d843a4325283e42711
diff --git a/core/api/src/main/java/org/onlab/onos/net/link/DefaultLinkDescription.java b/core/api/src/main/java/org/onlab/onos/net/link/DefaultLinkDescription.java
index 65d8d6d..037b796 100644
--- a/core/api/src/main/java/org/onlab/onos/net/link/DefaultLinkDescription.java
+++ b/core/api/src/main/java/org/onlab/onos/net/link/DefaultLinkDescription.java
@@ -1,5 +1,6 @@
 package org.onlab.onos.net.link;
 
+import com.google.common.base.MoreObjects;
 import org.onlab.onos.net.AbstractDescription;
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.Link;
@@ -46,4 +47,11 @@
         return type;
     }
 
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper("Link").add("src", src())
+                                .add("dst", dst())
+                                .add("type", type()).toString();
+    }
+
 }
diff --git a/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java b/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
index 7b35f1c..835c47e 100644
--- a/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
@@ -201,10 +201,10 @@
             ConnectPoint dst = linkDescription.dst();
             // if we aren't master for the device associated with the ConnectPoint
             // we probably shouldn't be doing this.
-            if ((deviceService.getRole(src.deviceId()) != MastershipRole.MASTER) ||
-                    (deviceService.getRole(dst.deviceId()) != MastershipRole.MASTER)) {
-                return;
-            }
+
+//            if (deviceService.getRole(dst.deviceId()) != MastershipRole.MASTER) {
+//                return;
+//            }
             LinkEvent event = store.createOrUpdateLink(provider().id(),
                                                        linkDescription);
             if (event != null) {
@@ -220,14 +220,8 @@
 
             ConnectPoint src = linkDescription.src();
             ConnectPoint dst = linkDescription.dst();
-            // if we aren't master for the device associated with the ConnectPoint
-            // we probably shouldn't be doing this.
-            if ((deviceService.getRole(src.deviceId()) != MastershipRole.MASTER) ||
-                    (deviceService.getRole(dst.deviceId()) != MastershipRole.MASTER)) {
-                return;
-            }
-            LinkEvent event = store.removeLink(linkDescription.src(),
-                                               linkDescription.dst());
+
+            LinkEvent event = store.removeLink(src, dst);
             if (event != null) {
                 log.info("Link {} vanished", linkDescription);
                 post(event);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index 312d072..9e58904 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -239,11 +239,14 @@
         LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
         final LinkEvent event;
         final Timestamped<LinkDescription> mergedDesc;
-        synchronized (getOrCreateLinkDescriptions(key)) {
+        Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
+        synchronized (map) {
             event = createOrUpdateLinkInternal(providerId, deltaDesc);
-            mergedDesc = getOrCreateLinkDescriptions(key).get(providerId);
+            mergedDesc = map.get(providerId);
         }
 
+
+
         if (event != null) {
             log.info("Notifying peers of a link update topology event from providerId: "
                     + "{}  between src: {} and dst: {}",
@@ -252,8 +255,8 @@
                 notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
             } catch (IOException e) {
                 log.info("Failed to notify peers of a link update topology event from providerId: "
-                        + "{}  between src: {} and dst: {}",
-                        providerId, linkDescription.src(), linkDescription.dst());
+                                 + "{}  between src: {} and dst: {}",
+                         providerId, linkDescription.src(), linkDescription.dst());
             }
         }
         return event;
diff --git a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/AbstractOpenFlowSwitch.java b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 00bbaca..f710726 100644
--- a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -288,6 +288,7 @@
             // The message wasn't really a Nicira role reply. We just
             // dispatch it to the OFMessage listeners in this case.
             this.handleMessage(m);
+            return;
         }
 
         RoleRecvStatus rrs = this.roleMan.deliverRoleReply(
@@ -301,8 +302,7 @@
                 this.transitionToEqualSwitch();
             }
         } else {
-            return;
-            //TODO: tell people that we failed.
+            this.disconnectSwitch();
         }
     }
 
diff --git a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java
index f611496..1dd673f 100644
--- a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java
@@ -71,6 +71,15 @@
         providerService = providerRegistry.register(this);
         deviceService.addListener(listener);
         packetSevice.addProcessor(listener, 0);
+        LinkDiscovery ld;
+        for (Device device : deviceService.getDevices()) {
+            ld = new LinkDiscovery(device, packetSevice, masterService,
+                              providerService, useBDDP);
+            discoverers.put(device.id(), ld);
+            for (Port p : deviceService.getPorts(device.id())) {
+                ld.addPort(p);
+            }
+        }
 
         log.info("Started");
     }
@@ -96,6 +105,10 @@
             LinkDiscovery ld = null;
             Device device = event.subject();
             Port port = event.port();
+            if (device == null) {
+                log.error("Device is null.");
+                return;
+            }
             switch (event.type()) {
                 case DEVICE_ADDED:
                     discoverers.put(device.id(),
@@ -144,6 +157,11 @@
                     break;
                 case DEVICE_UPDATED:
                 case DEVICE_MASTERSHIP_CHANGED:
+                    if (!discoverers.containsKey(device.id())) {
+                        discoverers.put(device.id(),
+                               new LinkDiscovery(device, packetSevice, masterService,
+                                      providerService, useBDDP));
+                    }
                     break;
                 default:
                     log.debug("Unknown event {}", event);
diff --git a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
index 3862437..75ce1da 100644
--- a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
@@ -257,10 +257,10 @@
                     sendProbes(portNumber);
                 } else {
                     // Update fast and slow ports
-                    //fastIterator.remove();
-                    //this.slowPorts.add(portNumber);
-                    //this.portProbeCount.remove(portNumber);
-                    this.portProbeCount.get(portNumber).set(0);
+                    fastIterator.remove();
+                    this.slowPorts.add(portNumber);
+                    this.portProbeCount.remove(portNumber);
+
 
                     ConnectPoint cp = new ConnectPoint(
                             device.id(),
diff --git a/providers/lldp/src/test/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProviderTest.java b/providers/lldp/src/test/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProviderTest.java
index 4410aaf..ff692d6 100644
--- a/providers/lldp/src/test/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProviderTest.java
+++ b/providers/lldp/src/test/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProviderTest.java
@@ -134,7 +134,7 @@
         deviceListener.event(portEvent(DeviceEvent.Type.PORT_ADDED, DID2, port(DID2, 1, false)));
 
 
-        assertNull("DPID exists",
+        assertNull("DeviceId exists",
                    provider.discoverers.get(DID2));
     }
 
@@ -394,7 +394,7 @@
 
         @Override
         public Iterable<Device> getDevices() {
-            return devices.values();
+            return Collections.EMPTY_LIST;
         }
 
         @Override
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index dfa2bea..54265ba 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -205,7 +205,7 @@
             pendingFMs.put(xid, installation);
         }
         pendingFutures.put(U32.f(batch.hashCode()), installation);
-        installation.verify(batch.hashCode());
+        installation.verify(U32.f(batch.hashCode()));
         return installation;
     }
 
@@ -226,7 +226,10 @@
 
         @Override
         public void switchRemoved(Dpid dpid) {
-            collectors.remove(dpid).stop();
+            FlowStatsCollector collector = collectors.remove(dpid);
+            if (collector != null) {
+                collector.stop();
+            }
         }
 
         @Override
@@ -321,7 +324,7 @@
         private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
 
         private final CountDownLatch countDownLatch;
-        private Integer pendingXid;
+        private Long pendingXid;
         private BatchState state;
 
         public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
@@ -391,7 +394,7 @@
         }
 
 
-        public void verify(Integer id) {
+        public void verify(Long id) {
             pendingXid = id;
             for (Dpid dpid : sws) {
                 OpenFlowSwitch sw = controller.getSwitch(dpid);
@@ -449,7 +452,9 @@
 
         private void cleanUp() {
             if (isDone() || isCancelled()) {
-                pendingFutures.remove(pendingXid);
+                if (pendingXid != null) {
+                    pendingFutures.remove(pendingXid);
+                }
                 for (Long xid : fms.keySet()) {
                     pendingFMs.remove(xid);
                 }