Support multiple Links from a Port

- Support multiple Links from a Port (ONOS-1574,ONOS-1391)

Change-Id: I0b53a950d0ad1e37bbedb726fee2d35df4ff0d60
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyImpl.java b/src/main/java/net/onrc/onos/core/topology/TopologyImpl.java
index 5ce3729..e6e6763 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyImpl.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyImpl.java
@@ -1,13 +1,19 @@
 package net.onrc.onos.core.topology;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.core.util.Dpid;
 import net.onrc.onos.core.util.PortNumber;
@@ -33,8 +39,9 @@
     private final Multimap<SwitchPort, Device> devices;
     private final ConcurrentMap<MACAddress, Device> mac2Device;
 
-    private final ConcurrentMap<SwitchPort, Link> outgoingLinks;
-    private final ConcurrentMap<SwitchPort, Link> incomingLinks;
+    // SwitchPort -> (type -> Link)
+    private final ConcurrentMap<SwitchPort, ConcurrentMap<String, Link>> outgoingLinks;
+    private final ConcurrentMap<SwitchPort, ConcurrentMap<String, Link>> incomingLinks;
 
     private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     private Lock readLock = readWriteLock.readLock();
@@ -128,29 +135,108 @@
 
     @Override
     public Link getOutgoingLink(Dpid dpid, PortNumber number) {
-        return outgoingLinks.get(new SwitchPort(dpid, number));
+        return getOutgoingLink(new SwitchPort(dpid, number));
     }
 
     @Override
     public Link getOutgoingLink(SwitchPort port) {
-        return outgoingLinks.get(port);
+        Map<String, Link> links = outgoingLinks.get(port);
+        return getPacketLinkIfExists(links);
+    }
+
+    // TODO remove when we no longer need packet fall back behavior
+    /**
+     * Gets the "packet" link if such exists, if not return whatever found.
+     *
+     * @param links Collection of links to search from
+     * @return Link instance found or null if no link exists
+     */
+    private Link getPacketLinkIfExists(Map<String, Link> links) {
+
+        if (links == null) {
+            return null;
+        }
+
+        Link link = links.get(TopologyElement.TYPE_PACKET);
+        if (link != null) {
+            // return packet link
+            return link;
+        } else {
+            // return whatever found
+            Iterator<Link> it = links.values().iterator();
+            if (it.hasNext()) {
+                return it.next();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Link getOutgoingLink(Dpid dpid, PortNumber number, String type) {
+        return getOutgoingLink(new SwitchPort(dpid, number), type);
+    }
+
+    @Override
+    public Link getOutgoingLink(SwitchPort port, String type) {
+        Map<String, Link> links = outgoingLinks.get(port);
+        return links.get(type);
+    }
+
+    @Override
+    public Collection<Link> getOutgoingLinks(SwitchPort port) {
+        return Collections.unmodifiableCollection(outgoingLinks.get(port).values());
     }
 
     @Override
     public Link getIncomingLink(Dpid dpid, PortNumber number) {
-        return incomingLinks.get(new SwitchPort(dpid, number));
+        return getIncomingLink(new SwitchPort(dpid, number));
     }
 
     @Override
     public Link getIncomingLink(SwitchPort port) {
-        return incomingLinks.get(port);
+        Map<String, Link> links = incomingLinks.get(port);
+        return getPacketLinkIfExists(links);
+    }
+
+    @Override
+    public Link getIncomingLink(Dpid dpid, PortNumber number, String type) {
+        return getIncomingLink(new SwitchPort(dpid, number), type);
+    }
+
+    @Override
+    public Link getIncomingLink(SwitchPort port, String type) {
+        Map<String, Link> links = incomingLinks.get(port);
+        return links.get(type);
+    }
+
+    @Override
+    public Collection<Link> getIncomingLinks(SwitchPort port) {
+        return Collections.unmodifiableCollection(incomingLinks.get(port).values());
     }
 
     @Override
     public Link getLink(Dpid srcDpid, PortNumber srcNumber,
                         Dpid dstDpid, PortNumber dstNumber) {
 
-        Link link = getOutgoingLink(srcDpid, srcNumber);
+        final SwitchPort dstSwitchPort = new SwitchPort(dstDpid, dstNumber);
+        Collection<Link> links = getOutgoingLinks(new SwitchPort(srcDpid, srcNumber));
+        for (Link link : links) {
+            if (link == null) {
+                continue;
+            }
+            if (link.getDstPort().asSwitchPort().equals(dstSwitchPort)) {
+                return link;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Link getLink(Dpid srcDpid, PortNumber srcNumber,
+                        Dpid dstDpid, PortNumber dstNumber,
+                        String type) {
+
+        Link link = getOutgoingLink(srcDpid, srcNumber, type);
         if (link == null) {
             return null;
         }
@@ -165,17 +251,53 @@
 
     @Override
     public Iterable<Link> getLinks() {
-        return Collections.unmodifiableCollection(outgoingLinks.values());
+        List<Link> links = new ArrayList<>();
+
+        for (Map<String, Link> portLinks : outgoingLinks.values()) {
+            links.addAll(portLinks.values());
+        }
+        return links;
     }
 
+    @GuardedBy("topology.writeLock")
     protected void putLink(Link link) {
-        outgoingLinks.put(link.getSrcPort().asSwitchPort(), link);
-        incomingLinks.put(link.getDstPort().asSwitchPort(), link);
+        putLinkMap(outgoingLinks, link.getSrcPort().asSwitchPort(), link);
+        putLinkMap(incomingLinks, link.getDstPort().asSwitchPort(), link);
     }
 
+    /**
+     * Helper method to update outgoingLinks, incomingLinks.
+     *
+     * @param linkMap outgoingLinks or incomingLinks
+     * @param port Map key
+     * @param link Link to add
+     */
+    @GuardedBy("topology.writeLock")
+    private void putLinkMap(ConcurrentMap<SwitchPort, ConcurrentMap<String, Link>> linkMap,
+                            SwitchPort port, Link link) {
+        ConcurrentMap<String, Link> portLinks = new ConcurrentHashMap<String, Link>(3);
+        portLinks.put(link.getType(), link);
+        Map<String, Link> existing = linkMap.putIfAbsent(
+                    port,
+                    portLinks);
+        if (existing != null) {
+            // no conditional update here
+            existing.put(link.getType(), link);
+        }
+    }
+
+    @GuardedBy("topology.writeLock")
     protected void removeLink(Link link) {
-        outgoingLinks.remove(link.getSrcPort().asSwitchPort(), link);
-        incomingLinks.remove(link.getDstPort().asSwitchPort(), link);
+        ConcurrentMap<String, Link> portLinks = outgoingLinks.get(link.getSrcPort().asSwitchPort());
+        if (portLinks != null) {
+            // no conditional update here
+            portLinks.remove(link.getType());
+        }
+        portLinks = incomingLinks.get(link.getDstPort().asSwitchPort());
+        if (portLinks != null) {
+            // no conditional update here
+            portLinks.remove(link.getType());
+        }
     }
 
     @Override