General link discovery module cleanup.

* Removed all events we don't use from the event system (switch and port
  events, link updates)
* Refactored event interface to two separate methods to match the other event
  interfaces
* Removed the LDUpdate class and moved all the event enums and classes to be
  internal to the LinkDiscoveryManager
* Removed all LinkTypes we no longer used and moved the one remaining type
  to the ILinkDiscoveryService. After this the ILinkDiscovery interface is
  no longer needed.
* Made Link immutable
* Removed the linkdiscovery.internal package as it only contained one class
* Readability improvements to LinkDiscoveryManager

Change-Id: Ifae97879aadc49b70a7b3d2294dcc540538c2cfc
diff --git a/src/main/java/net/onrc/onos/core/linkdiscovery/LinkDiscoveryManager.java b/src/main/java/net/onrc/onos/core/linkdiscovery/LinkDiscoveryManager.java
new file mode 100644
index 0000000..db35809
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/linkdiscovery/LinkDiscoveryManager.java
@@ -0,0 +1,1007 @@
+/**
+ *    Copyright 2011, Big Switch Networks, Inc.
+ *    Originally created by David Erickson, Stanford University
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License"); you may
+ *    not use this file except in compliance with the License. You may obtain
+ *    a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ *    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ *    License for the specific language governing permissions and limitations
+ *    under the License.
+ **/
+
+package net.onrc.onos.core.linkdiscovery;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
+import net.floodlightcontroller.core.IUpdate;
+import net.floodlightcontroller.core.annotations.LogMessageCategory;
+import net.floodlightcontroller.core.annotations.LogMessageDoc;
+import net.floodlightcontroller.core.annotations.LogMessageDocs;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.core.module.FloodlightModuleException;
+import net.floodlightcontroller.core.module.IFloodlightModule;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.core.util.SingletonTask;
+import net.floodlightcontroller.restserver.IRestApiService;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.onrc.onos.core.linkdiscovery.web.LinkDiscoveryWebRoutable;
+import net.onrc.onos.core.packet.Ethernet;
+import net.onrc.onos.core.packet.LLDP;
+import net.onrc.onos.core.packet.OnosLldp;
+import net.onrc.onos.core.registry.IControllerRegistryService;
+import net.onrc.onos.core.util.SwitchPort;
+
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFPacketIn;
+import org.openflow.protocol.OFPacketOut;
+import org.openflow.protocol.OFPhysicalPort;
+import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
+import org.openflow.protocol.OFPhysicalPort.OFPortState;
+import org.openflow.protocol.OFPort;
+import org.openflow.protocol.OFPortStatus;
+import org.openflow.protocol.OFPortStatus.OFPortReason;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.action.OFActionOutput;
+import org.openflow.protocol.action.OFActionType;
+import org.openflow.util.HexString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Discovers links between OpenFlow switches.
+ * <p/>
+ * Discovery is performed by sending probes (LLDP packets) over the links in
+ * the data plane. The LinkDiscoveryManager sends probes periodically on all
+ * ports on all connected switches. The probes contain the sending switch's
+ * DPID and outgoing port number. LLDP packets that are received (via an
+ * OpenFlow packet-in) indicate there is a link between the receiving port and
+ * the sending port, which was encoded in the LLDP. When the
+ * LinkDiscoveryManager observes a new link, a Link object is created and an
+ * event is fired for any event listeners.
+ * <p/>
+ * Links are removed for one of three reasons:
+ * <ul>
+ * <li>A probe has not been received on the link for an interval (the timeout
+ * interval)</li>
+ * <li>The port went down or was disabled (as observed by OpenFlow port-status
+ * messages) or the disconnection of the switch</li>
+ * <li>Link discovery was explicitly disabled on a port with the
+ * {@link #disableDiscoveryOnPort(long, short)} method</li>
+ * </ul>
+ * When the LinkDiscoveryManager removes a link it also fires an event for the
+ * listeners.
+ */
+@LogMessageCategory("Network Topology")
+public class LinkDiscoveryManager implements IOFMessageListener, IOFSwitchListener,
+        ILinkDiscoveryService, IFloodlightModule {
+
+    private static final Logger log =
+            LoggerFactory.getLogger(LinkDiscoveryManager.class);
+
+    private IFloodlightProviderService controller;
+
+    private IFloodlightProviderService floodlightProvider;
+    private IThreadPoolService threadPool;
+    private IRestApiService restApi;
+    private IControllerRegistryService registryService;
+
+    // LLDP fields
+    private static final byte[] LLDP_STANDARD_DST_MAC_STRING =
+            HexString.fromHexString("01:80:c2:00:00:0e");
+    private static final long LINK_LOCAL_MASK = 0xfffffffffff0L;
+    private static final long LINK_LOCAL_VALUE = 0x0180c2000000L;
+
+    // Link discovery task details.
+    private SingletonTask discoveryTask;
+    private static final int DISCOVERY_TASK_INTERVAL = 1;
+    private static final int LINK_TIMEOUT = 35; // original 35 secs, aggressive 5 secs
+    private static final int LLDP_TO_ALL_INTERVAL = 15; //original 15 seconds, aggressive 2 secs.
+    private long lldpClock = 0;
+    // This value is intentionally kept higher than LLDP_TO_ALL_INTERVAL.
+    // If we want to identify link failures faster, we could decrease this
+    // value to a small number, say 1 or 2 sec.
+    private static final int LLDP_TO_KNOWN_INTERVAL = 20; // LLDP frequency for known links
+
+    private ReentrantReadWriteLock lock;
+
+    /**
+     * Map from link to the most recent time it was verified functioning.
+     */
+    protected Map<Link, LinkInfo> links;
+
+    /**
+     * Map from switch id to a set of all links with it as an endpoint.
+     */
+    protected Map<Long, Set<Link>> switchLinks;
+
+    /**
+     * Map from a id:port to the set of links containing it as an endpoint.
+     */
+    protected Map<NodePortTuple, Set<Link>> portLinks;
+
+    /**
+     * Listeners are called in the order they were added to the the list.
+     */
+    private final List<ILinkDiscoveryListener> linkDiscoveryListeners
+            = new CopyOnWriteArrayList<>();
+
+    /**
+     * List of ports through which LLDPs are not sent.
+     */
+    private Set<NodePortTuple> suppressLinkDiscovery;
+
+    private enum UpdateType {
+        LINK_ADDED,
+        LINK_REMOVED
+    }
+
+    private class LinkUpdate implements IUpdate {
+        private final Link link;
+        private final UpdateType operation;
+
+        public LinkUpdate(Link link, UpdateType operation) {
+            this.link = link;
+            this.operation = operation;
+        }
+
+        @Override
+        public void dispatch() {
+            if (log.isTraceEnabled()) {
+                log.trace("Dispatching link discovery update {} for {}",
+                        operation, link);
+            }
+            for (ILinkDiscoveryListener listener : linkDiscoveryListeners) {
+                switch (operation) {
+                case LINK_ADDED:
+                    listener.linkAdded(link);
+                    break;
+                case LINK_REMOVED:
+                    listener.linkRemoved(link);
+                    break;
+                default:
+                    log.warn("Unknown link update operation {}", operation);
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * Gets the LLDP sending period in seconds.
+     *
+     * @return LLDP sending period in seconds.
+     */
+    public int getLldpFrequency() {
+        return LLDP_TO_KNOWN_INTERVAL;
+    }
+
+    /**
+     * Gets the LLDP timeout value in seconds.
+     *
+     * @return LLDP timeout value in seconds
+     */
+    public int getLldpTimeout() {
+        return LINK_TIMEOUT;
+    }
+
+    @Override
+    public Set<NodePortTuple> getDiscoveryDisabledPorts() {
+        return suppressLinkDiscovery;
+    }
+
+    @Override
+    public void disableDiscoveryOnPort(long sw, short port) {
+        NodePortTuple npt = new NodePortTuple(sw, port);
+        this.suppressLinkDiscovery.add(npt);
+        deleteLinksOnPort(npt);
+    }
+
+    @Override
+    public void enableDiscoveryOnPort(long sw, short port) {
+        NodePortTuple npt = new NodePortTuple(sw, port);
+        this.suppressLinkDiscovery.remove(npt);
+        discover(npt);
+    }
+
+    private boolean isLinkDiscoverySuppressed(long sw, short portNumber) {
+        return this.suppressLinkDiscovery.contains(new NodePortTuple(sw, portNumber));
+    }
+
+    private void discoverLinks() {
+
+        // time out known links.
+        timeOutLinks();
+
+        //increment LLDP clock
+        lldpClock = (lldpClock + 1) % LLDP_TO_ALL_INTERVAL;
+
+        if (lldpClock == 0) {
+            log.debug("Sending LLDP out on all ports.");
+            discoverOnAllPorts();
+        }
+    }
+
+    /**
+     * Send LLDP on known ports.
+     */
+    protected void discoverOnKnownLinkPorts() {
+        // Copy the port set.
+        Set<NodePortTuple> nptSet = new HashSet<NodePortTuple>();
+        nptSet.addAll(portLinks.keySet());
+
+        // Send LLDP from each of them.
+        for (NodePortTuple npt : nptSet) {
+            discover(npt);
+        }
+    }
+
+    private void discover(NodePortTuple npt) {
+        discover(npt.getNodeId(), npt.getPortId());
+    }
+
+    private void discover(long sw, short port) {
+        sendDiscoveryMessage(sw, port, false);
+    }
+
+    /**
+     * Send link discovery message out of a given switch port.
+     * The discovery message is a standard LLDP containing ONOS-specific TLVs.
+     *
+     * @param sw the switch to send on
+     * @param port the port to send out
+     * @param isReverse indicates whether the LLDP was sent as a response
+     */
+    @LogMessageDoc(level = "ERROR",
+            message = "Failure sending LLDP out port {port} on switch {switch}",
+            explanation = "An I/O error occured while sending LLDP message " +
+                    "to the switch.",
+            recommendation = LogMessageDoc.CHECK_SWITCH)
+    protected void sendDiscoveryMessage(long sw, short port,
+                                        boolean isReverse) {
+
+        IOFSwitch iofSwitch = floodlightProvider.getSwitches().get(sw);
+        if (iofSwitch == null) {
+            return;
+        }
+
+        if (port == OFPort.OFPP_LOCAL.getValue()) {
+            return;
+        }
+
+        OFPhysicalPort ofpPort = iofSwitch.getPort(port);
+
+        if (ofpPort == null) {
+            if (log.isTraceEnabled()) {
+                log.trace("Null physical port. sw={}, port={}", sw, port);
+            }
+            return;
+        }
+
+        if (isLinkDiscoverySuppressed(sw, port)) {
+            // Don't send LLDPs out of this port as suppressLLDPs set
+            return;
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("Sending LLDP packet out of swich: {}, port: {}",
+                    sw, port);
+        }
+
+        OFPacketOut po = createLLDPPacketOut(sw, ofpPort, isReverse);
+
+        try {
+            iofSwitch.write(po, null);
+            iofSwitch.flush();
+        } catch (IOException e) {
+            log.error("Failure sending LLDP out port " + port + " on switch "
+                    + iofSwitch.getStringId(), e);
+        }
+
+    }
+
+    /**
+     * Creates packet_out LLDP for specified output port.
+     *
+     * @param dpid the dpid of the outgoing switch
+     * @param port the outgoing port
+     * @param isReverse whether this is a reverse LLDP or not
+     * @return Packet_out message with LLDP data
+     */
+    private OFPacketOut createLLDPPacketOut(long dpid,
+            final OFPhysicalPort port, boolean isReverse) {
+        // Set up packets
+        // TODO optimize by not creating new packets each time
+        OnosLldp lldpPacket = new OnosLldp();
+
+        Ethernet ethPacket = new Ethernet();
+        ethPacket.setEtherType(Ethernet.TYPE_LLDP);
+        ethPacket.setDestinationMACAddress(LLDP_STANDARD_DST_MAC_STRING);
+        ethPacket.setPayload(lldpPacket);
+        ethPacket.setPad(true);
+
+        final OFPacketOut packetOut = (OFPacketOut) floodlightProvider.getOFMessageFactory()
+                .getMessage(OFType.PACKET_OUT);
+        packetOut.setBufferId(OFPacketOut.BUFFER_ID_NONE);
+
+        final List<OFAction> actionsList = new LinkedList<OFAction>();
+        final OFActionOutput out = (OFActionOutput) floodlightProvider.getOFMessageFactory()
+                .getAction(OFActionType.OUTPUT);
+        out.setPort(port.getPortNumber());
+        actionsList.add(out);
+        packetOut.setActions(actionsList);
+        final short alen = (short) OFActionOutput.MINIMUM_LENGTH;
+
+        lldpPacket.setSwitch(dpid);
+        lldpPacket.setPort(port.getPortNumber());
+        lldpPacket.setReverse(isReverse);
+        ethPacket.setSourceMACAddress(port.getHardwareAddress());
+
+        final byte[] lldp = ethPacket.serialize();
+        packetOut.setActionsLength(alen);
+        packetOut.setPacketData(lldp);
+        packetOut
+                .setLength((short) (OFPacketOut.MINIMUM_LENGTH + alen + lldp.length));
+        return packetOut;
+    }
+
+    /**
+     * Send LLDPs to all switch-ports.
+     */
+    protected void discoverOnAllPorts() {
+        if (log.isTraceEnabled()) {
+            log.trace("Sending LLDP packets out of all the enabled ports on switch");
+        }
+
+        for (IOFSwitch sw : floodlightProvider.getSwitches().values()) {
+            if (sw.getEnabledPorts() == null) {
+                continue;
+            }
+            for (OFPhysicalPort ofp : sw.getEnabledPorts()) {
+                if (isLinkDiscoverySuppressed(sw.getId(), ofp.getPortNumber())) {
+                    continue;
+                }
+
+                sendDiscoveryMessage(sw.getId(), ofp.getPortNumber(), false);
+            }
+        }
+    }
+
+    @Override
+    public String getName() {
+        return "linkdiscovery";
+    }
+
+    @Override
+    public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+        switch (msg.getType()) {
+            case PACKET_IN:
+                if (msg instanceof OFPacketIn) {
+                    return this.handlePacketIn(sw.getId(), (OFPacketIn) msg,
+                                               cntx);
+                }
+                break;
+            case PORT_STATUS:
+                if (msg instanceof OFPortStatus) {
+                    return this.handlePortStatus(sw, (OFPortStatus) msg);
+                }
+                break;
+            default:
+                break;
+        }
+        return Command.CONTINUE;
+    }
+
+    private Command handleLldp(LLDP lldp, long sw, OFPacketIn pi) {
+        // If LLDP is suppressed on this port, ignore received packet as well
+        IOFSwitch iofSwitch = floodlightProvider.getSwitches().get(sw);
+        if (iofSwitch == null) {
+            return Command.STOP;
+        }
+
+        if (isLinkDiscoverySuppressed(sw, pi.getInPort())) {
+            return Command.STOP;
+        }
+
+        // If this is a malformed LLDP, or not from us, exit
+        if (lldp.getPortId() == null || lldp.getPortId().getLength() != 3) {
+            return Command.CONTINUE;
+        }
+
+        // Verify this LLDP packet matches what we're looking for
+        byte[] packetData = pi.getPacketData();
+        if (!OnosLldp.isOnosLldp(packetData)) {
+            log.trace("Dropping LLDP that wasn't sent by ONOS");
+            return Command.STOP;
+        }
+
+        SwitchPort switchPort = OnosLldp.extractSwitchPort(packetData);
+        long remoteDpid = switchPort.dpid().value();
+        short remotePort = switchPort.port().value();
+        IOFSwitch remoteSwitch = floodlightProvider.getSwitches().get(
+                switchPort.dpid().value());
+
+
+        OFPhysicalPort physicalPort = null;
+        if (remoteSwitch != null) {
+            physicalPort = remoteSwitch.getPort(remotePort);
+            if (!remoteSwitch.portEnabled(remotePort)) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Ignoring link with disabled source port: " +
+                            "switch {} port {}", remoteSwitch, remotePort);
+                }
+                return Command.STOP;
+            }
+            if (suppressLinkDiscovery.contains(
+                    new NodePortTuple(remoteSwitch.getId(), remotePort))) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Ignoring link with suppressed src port: " +
+                            "switch {} port {}", remoteSwitch, remotePort);
+                }
+                return Command.STOP;
+            }
+        }
+        if (!iofSwitch.portEnabled(pi.getInPort())) {
+            if (log.isTraceEnabled()) {
+                log.trace("Ignoring link with disabled dest port: " +
+                        "switch {} port {}", sw, pi.getInPort());
+            }
+            return Command.STOP;
+        }
+
+        int srcPortState = (physicalPort != null) ? physicalPort.getState() : 0;
+        physicalPort = iofSwitch.getPort(pi.getInPort());
+        int dstPortState = (physicalPort != null) ? physicalPort.getState() : 0;
+
+        // Store the time of update to this link, and push it out to routingEngine
+        Link lt = new Link(remoteDpid, remotePort, iofSwitch.getId(), pi.getInPort());
+
+        LinkInfo linkInfo = new LinkInfo(System.currentTimeMillis(),
+                System.currentTimeMillis(), srcPortState, dstPortState);
+
+        addOrUpdateLink(lt, linkInfo);
+
+        // Check if reverse link exists.
+        // If it doesn't exist and if the forward link was seen
+        // first seen within a small interval, send probe on the
+        // reverse link.
+        boolean isReverse = OnosLldp.isReverse(lldp);
+
+        LinkInfo newLinkInfo = links.get(lt);
+        if (newLinkInfo != null && !isReverse) {
+            Link reverseLink = new Link(lt.getDst(), lt.getDstPort(),
+                    lt.getSrc(), lt.getSrcPort());
+            LinkInfo reverseInfo = links.get(reverseLink);
+            if (reverseInfo == null) {
+                // the reverse link does not exist.
+                if (newLinkInfo.getFirstSeenTime() >
+                        System.currentTimeMillis() - LINK_TIMEOUT) {
+                    this.sendDiscoveryMessage(lt.getDst(), lt.getDstPort(), true);
+                }
+            }
+        }
+
+        // Consume this message
+        return Command.STOP;
+    }
+
+    protected Command handlePacketIn(long sw, OFPacketIn pi,
+                                     FloodlightContext cntx) {
+        Ethernet eth =
+                IFloodlightProviderService.bcStore.get(cntx,
+                        IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
+
+        if (eth.getEtherType() == Ethernet.TYPE_LLDP) {
+            return handleLldp((LLDP) eth.getPayload(), sw, pi);
+        } else if (eth.getEtherType() < 1500) {
+            long destMac = eth.getDestinationMAC().toLong();
+            if ((destMac & LINK_LOCAL_MASK) == LINK_LOCAL_VALUE) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Ignoring packet addressed to 802.1D/Q " +
+                            "reserved address.");
+                }
+                return Command.STOP;
+            }
+        }
+
+        return Command.CONTINUE;
+    }
+
+    protected void addOrUpdateLink(Link lt, LinkInfo detectedLinkInfo) {
+        lock.writeLock().lock();
+        try {
+            LinkInfo existingInfo = links.get(lt);
+
+            LinkInfo newLinkInfo = new LinkInfo(
+                    ((existingInfo == null) ? detectedLinkInfo.getFirstSeenTime()
+                            : existingInfo.getFirstSeenTime()),
+                    detectedLinkInfo.getLastProbeReceivedTime(),
+                    detectedLinkInfo.getSrcPortState(),
+                    detectedLinkInfo.getDstPortState());
+
+            // Add new LinkInfo or update old LinkInfo
+            links.put(lt, newLinkInfo);
+
+            if (log.isTraceEnabled()) {
+                log.trace("addOrUpdateLink: {}", lt);
+            }
+
+            NodePortTuple srcNpt = new NodePortTuple(lt.getSrc(), lt.getSrcPort());
+            NodePortTuple dstNpt = new NodePortTuple(lt.getDst(), lt.getDstPort());
+
+            // If this is the first time we've seen the link, add the Link
+            // object to the data structures/indexes as well
+            if (existingInfo == null) {
+                // index it by switch source
+                if (!switchLinks.containsKey(lt.getSrc())) {
+                    switchLinks.put(lt.getSrc(), new HashSet<Link>());
+                }
+                switchLinks.get(lt.getSrc()).add(lt);
+
+                // index it by switch dest
+                if (!switchLinks.containsKey(lt.getDst())) {
+                    switchLinks.put(lt.getDst(), new HashSet<Link>());
+                }
+                switchLinks.get(lt.getDst()).add(lt);
+
+                // index both ends by switch:port
+                if (!portLinks.containsKey(srcNpt)) {
+                    portLinks.put(srcNpt, new HashSet<Link>());
+                }
+                portLinks.get(srcNpt).add(lt);
+
+                if (!portLinks.containsKey(dstNpt)) {
+                    portLinks.put(dstNpt, new HashSet<Link>());
+                }
+                portLinks.get(dstNpt).add(lt);
+
+                // Publish LINK_ADDED event
+                controller.publishUpdate(new LinkUpdate(lt, UpdateType.LINK_ADDED));
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Removes links from the data structures.
+     *
+     * @param linksToDelete the list of links to delete
+     */
+    protected void deleteLinks(List<Link> linksToDelete) {
+        lock.writeLock().lock();
+        try {
+            for (Link lt : linksToDelete) {
+                NodePortTuple srcNpt = new NodePortTuple(lt.getSrc(), lt.getSrcPort());
+                NodePortTuple dstNpt = new NodePortTuple(lt.getDst(), lt.getDstPort());
+
+                switchLinks.get(lt.getSrc()).remove(lt);
+                switchLinks.get(lt.getDst()).remove(lt);
+                if (switchLinks.containsKey(lt.getSrc()) &&
+                        switchLinks.get(lt.getSrc()).isEmpty()) {
+                    this.switchLinks.remove(lt.getSrc());
+                }
+                if (this.switchLinks.containsKey(lt.getDst()) &&
+                        this.switchLinks.get(lt.getDst()).isEmpty()) {
+                    this.switchLinks.remove(lt.getDst());
+                }
+
+                if (this.portLinks.get(srcNpt) != null) {
+                    this.portLinks.get(srcNpt).remove(lt);
+                    if (this.portLinks.get(srcNpt).isEmpty()) {
+                        this.portLinks.remove(srcNpt);
+                    }
+                }
+                if (this.portLinks.get(dstNpt) != null) {
+                    this.portLinks.get(dstNpt).remove(lt);
+                    if (this.portLinks.get(dstNpt).isEmpty()) {
+                        this.portLinks.remove(dstNpt);
+                    }
+                }
+
+                this.links.remove(lt);
+
+                controller.publishUpdate(new LinkUpdate(lt,
+                        UpdateType.LINK_REMOVED));
+
+                if (log.isTraceEnabled()) {
+                    log.trace("Deleted link {}", lt);
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Handles an OFPortStatus message from a switch. We will add or
+     * delete LinkTupes as well re-compute the topology if needed.
+     *
+     * @param sw The dpid of the switch that sent the port status message
+     * @param ps The OFPortStatus message
+     * @return The Command to continue or stop after we process this message
+     */
+    protected Command handlePortStatus(IOFSwitch sw, OFPortStatus ps) {
+
+        // If we do not control this switch, then we should not process its
+        // port status messages
+        if (!registryService.hasControl(sw.getId())) {
+            return Command.CONTINUE;
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("handlePortStatus: Switch {} port #{} reason {}; " +
+                    "config is {} state is {}",
+                    new Object[]{sw.getStringId(),
+                            ps.getDesc().getPortNumber(),
+                            ps.getReason(),
+                            ps.getDesc().getConfig(),
+                            ps.getDesc().getState()});
+        }
+
+        short port = ps.getDesc().getPortNumber();
+        NodePortTuple npt = new NodePortTuple(sw.getId(), port);
+        boolean linkDeleted = false;
+        boolean linkInfoChanged = false;
+
+        lock.writeLock().lock();
+        try {
+            // if ps is a delete, or a modify where the port is down or
+            // configured down
+            if ((byte) OFPortReason.OFPPR_DELETE.ordinal() == ps.getReason() ||
+                    ((byte) OFPortReason.OFPPR_MODIFY.ordinal() ==
+                            ps.getReason() && !portEnabled(ps.getDesc()))) {
+
+                deleteLinksOnPort(npt);
+                linkDeleted = true;
+            } else if (ps.getReason() ==
+                    (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
+                // If ps is a port modification and the port state has changed
+                // that affects links in the topology
+
+                if (this.portLinks.containsKey(npt)) {
+                    for (Link lt : this.portLinks.get(npt)) {
+                        LinkInfo linkInfo = links.get(lt);
+                        assert (linkInfo != null);
+                        LinkInfo newLinkInfo = null;
+
+                        if (lt.isSrcPort(npt) &&
+                                linkInfo.getSrcPortState() != ps.getDesc().getState()) {
+                            // If this port status is for the src port and the port
+                            // state has changed, create a new link info with the new state
+
+                            newLinkInfo = new LinkInfo(linkInfo.getFirstSeenTime(),
+                                    linkInfo.getLastProbeReceivedTime(),
+                                    ps.getDesc().getState(),
+                                    linkInfo.getDstPortState());
+                        } else if (lt.isDstPort(npt) &&
+                                linkInfo.getDstPortState() != ps.getDesc().getState()) {
+                            // If this port status is for the dst port and the port
+                            // state has changed, create a new link info with the new state
+
+                            newLinkInfo = new LinkInfo(linkInfo.getFirstSeenTime(),
+                                    linkInfo.getLastProbeReceivedTime(),
+                                    linkInfo.getSrcPortState(),
+                                    ps.getDesc().getState());
+                        }
+
+                        if (newLinkInfo != null) {
+                            linkInfoChanged = true;
+                            links.put(lt, newLinkInfo);
+                        }
+                    }
+                }
+            }
+
+
+            if (!linkDeleted && !linkInfoChanged) {
+                if (log.isTraceEnabled()) {
+                    log.trace("handlePortStatus: Switch {} port #{} reason {};" +
+                            " no links to update/remove",
+                            new Object[]{HexString.toHexString(sw.getId()),
+                                    ps.getDesc().getPortNumber(),
+                                    ps.getReason()});
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if (!linkDeleted) {
+            // Send LLDP right away when port state is changed for faster
+            // cluster-merge. If it is a link delete then there is not need
+            // to send the LLDPs right away and instead we wait for the LLDPs
+            // to be sent on the timer as it is normally done
+            // do it outside the write-lock
+            // sendLLDPTask.reschedule(1000, TimeUnit.MILLISECONDS);
+            processNewPort(npt.getNodeId(), npt.getPortId());
+        }
+        return Command.CONTINUE;
+    }
+
+    /**
+     * Process a new port.
+     * If link discovery is disabled on the port, then do nothing.
+     * Otherwise, send LLDP message.
+     *
+     * @param sw the dpid of the switch the port is on
+     * @param p the number of the port
+     */
+    private void processNewPort(long sw, short p) {
+        if (isLinkDiscoverySuppressed(sw, p)) {
+            // Do nothing as link discovery is suppressed.
+            return;
+        } else {
+            discover(sw, p);
+        }
+    }
+
+    /**
+     * We send out LLDP messages when a switch is added to discover the topology.
+     *
+     * @param sw The IOFSwitch that connected to the controller
+     */
+    @Override
+    public void addedSwitch(IOFSwitch sw) {
+        if (sw.getEnabledPorts() != null) {
+            for (Short p : sw.getEnabledPortNumbers()) {
+                processNewPort(sw.getId(), p);
+            }
+        }
+    }
+
+    /**
+     * When a switch disconnects we remove any links from our map and notify.
+     *
+     * @param iofSwitch the switch that was removed
+     */
+    @Override
+    public void removedSwitch(IOFSwitch iofSwitch) {
+        // Update event history
+        long sw = iofSwitch.getId();
+
+        List<Link> eraseList = new ArrayList<Link>();
+        lock.writeLock().lock();
+        try {
+            if (switchLinks.containsKey(sw)) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Handle switchRemoved. Switch {}; removing links {}",
+                            HexString.toHexString(sw), switchLinks.get(sw));
+                }
+                // add all tuples with an endpoint on this switch to erase list
+                eraseList.addAll(switchLinks.get(sw));
+                deleteLinks(eraseList);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /*
+     * We don't react the port changed notifications here. we listen for
+     * OFPortStatus messages directly. Might consider using this notifier
+     * instead
+     */
+    @Override
+    public void switchPortChanged(Long switchId) {
+        // no-op
+    }
+
+    /**
+     * Delete links incident on a given switch port.
+     *
+     * @param npt the port to delete links on
+     */
+    protected void deleteLinksOnPort(NodePortTuple npt) {
+        List<Link> eraseList = new ArrayList<Link>();
+        if (this.portLinks.containsKey(npt)) {
+            if (log.isTraceEnabled()) {
+                log.trace("handlePortStatus: Switch {} port #{} " +
+                        "removing links {}",
+                        new Object[]{HexString.toHexString(npt.getNodeId()),
+                                npt.getPortId(),
+                                this.portLinks.get(npt)});
+            }
+            eraseList.addAll(this.portLinks.get(npt));
+            deleteLinks(eraseList);
+        }
+    }
+
+    /**
+     * Iterates through the list of links and deletes if the
+     * last discovery message reception time exceeds timeout values.
+     */
+    protected void timeOutLinks() {
+        List<Link> eraseList = new ArrayList<Link>();
+        Long curTime = System.currentTimeMillis();
+
+        // reentrant required here because deleteLink also write locks
+        lock.writeLock().lock();
+        try {
+            Iterator<Entry<Link, LinkInfo>> it =
+                    this.links.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<Link, LinkInfo> entry = it.next();
+                LinkInfo info = entry.getValue();
+
+                if ((info.getLastProbeReceivedTime() + (1000L * LINK_TIMEOUT)
+                        < curTime)) {
+                    eraseList.add(entry.getKey());
+                }
+            }
+
+            deleteLinks(eraseList);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private boolean portEnabled(OFPhysicalPort port) {
+        if (port == null) {
+            return false;
+        }
+        if ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) {
+            return false;
+        }
+        if ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public Map<Link, LinkInfo> getLinks() {
+        lock.readLock().lock();
+        Map<Link, LinkInfo> result;
+        try {
+            result = new HashMap<Link, LinkInfo>(links);
+        } finally {
+            lock.readLock().unlock();
+        }
+        return result;
+    }
+
+    @Override
+    public void addListener(ILinkDiscoveryListener listener) {
+        linkDiscoveryListeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(ILinkDiscoveryListener listener) {
+        linkDiscoveryListeners.remove(listener);
+    }
+
+    @Override
+    public boolean isCallbackOrderingPrereq(OFType type, String name) {
+        return false;
+    }
+
+    @Override
+    public boolean isCallbackOrderingPostreq(OFType type, String name) {
+        return false;
+    }
+
+    // IFloodlightModule classes
+
+    @Override
+    public Collection<Class<? extends IFloodlightService>> getModuleServices() {
+        Collection<Class<? extends IFloodlightService>> l =
+                new ArrayList<Class<? extends IFloodlightService>>();
+        l.add(ILinkDiscoveryService.class);
+        return l;
+    }
+
+    @Override
+    public Map<Class<? extends IFloodlightService>, IFloodlightService>
+            getServiceImpls() {
+        Map<Class<? extends IFloodlightService>, IFloodlightService> m =
+                new HashMap<>();
+        m.put(ILinkDiscoveryService.class, this);
+        return m;
+    }
+
+    @Override
+    public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
+        Collection<Class<? extends IFloodlightService>> l = new ArrayList<>();
+        l.add(IFloodlightProviderService.class);
+        l.add(IThreadPoolService.class);
+        l.add(IRestApiService.class);
+        l.add(IControllerRegistryService.class);
+        return l;
+    }
+
+    @Override
+    public void init(FloodlightModuleContext context)
+            throws FloodlightModuleException {
+        floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
+        threadPool = context.getServiceImpl(IThreadPoolService.class);
+        restApi = context.getServiceImpl(IRestApiService.class);
+        registryService = context.getServiceImpl(IControllerRegistryService.class);
+
+        this.lock = new ReentrantReadWriteLock();
+        this.links = new HashMap<Link, LinkInfo>();
+        this.portLinks = new HashMap<NodePortTuple, Set<Link>>();
+        this.suppressLinkDiscovery =
+                Collections.synchronizedSet(new HashSet<NodePortTuple>());
+        this.switchLinks = new HashMap<Long, Set<Link>>();
+    }
+
+    @Override
+    @LogMessageDocs({
+            @LogMessageDoc(level = "ERROR",
+                    message = "No storage source found.",
+                    explanation = "Storage source was not initialized; cannot initialize " +
+                            "link discovery.",
+                    recommendation = LogMessageDoc.REPORT_CONTROLLER_BUG),
+            @LogMessageDoc(level = "ERROR",
+                    message = "Error in installing listener for " +
+                            "switch config table {table}",
+                    explanation = "Failed to install storage notification for the " +
+                            "switch config table",
+                    recommendation = LogMessageDoc.REPORT_CONTROLLER_BUG),
+            @LogMessageDoc(level = "ERROR",
+                    message = "No storage source found.",
+                    explanation = "Storage source was not initialized; cannot initialize " +
+                            "link discovery.",
+                    recommendation = LogMessageDoc.REPORT_CONTROLLER_BUG),
+            @LogMessageDoc(level = "ERROR",
+                    message = "Exception in LLDP send timer.",
+                    explanation = "An unknown error occured while sending LLDP " +
+                            "messages to switches.",
+                    recommendation = LogMessageDoc.CHECK_SWITCH)
+    })
+    public void startUp(FloodlightModuleContext context) {
+        ScheduledExecutorService ses = threadPool.getScheduledExecutor();
+        controller = context.getServiceImpl(IFloodlightProviderService.class);
+
+        discoveryTask = new SingletonTask(ses, new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    discoverLinks();
+                } catch (Exception e) {
+                    log.error("Exception in LLDP send timer.", e);
+                } finally {
+                    log.trace("Rescheduling discovery task");
+                    discoveryTask.reschedule(DISCOVERY_TASK_INTERVAL,
+                            TimeUnit.SECONDS);
+                }
+            }
+        });
+
+        discoveryTask.reschedule(DISCOVERY_TASK_INTERVAL, TimeUnit.SECONDS);
+
+        // Register for the OpenFlow messages we want to receive
+        floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
+        floodlightProvider.addOFMessageListener(OFType.PORT_STATUS, this);
+        // Register for switch updates
+        floodlightProvider.addOFSwitchListener(this);
+        restApi.addRestletRoutable(new LinkDiscoveryWebRoutable());
+    }
+}