Merge pull request #520 from jonohart/arprefactor

Refactored the notification mechanism for getting other instances to send packet-outs.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index c42488c..6d953f1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -462,6 +462,11 @@
 		MACAddress srcMacAddress = installedFlowPath.flowEntryMatch().srcMac();
 		MACAddress dstMacAddress = installedFlowPath.flowEntryMatch().dstMac();
 		
+		if (srcMacAddress == null || dstMacAddress == null) {
+			// Not our flow path, ignore
+			return;
+		}
+		
 		Collection<PacketToPush> packets;
 		synchronized (lock) {
 			log.debug("Flow {} has been installed, sending queued packets",
diff --git a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
index 1dbfdcb..3bb2878 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
@@ -112,7 +112,7 @@
  */
 @LogMessageCategory("Network Topology")
 public class LinkDiscoveryManager
-implements IOFMessageListener, IOFSwitchListener, 
+implements IOFMessageListener, IOFSwitchListener,
 ILinkDiscoveryService, IFloodlightModule {
 	protected IFloodlightProviderService controller;
     protected final static Logger log = LoggerFactory.getLogger(LinkDiscoveryManager.class);
@@ -125,7 +125,7 @@
 
 
     // LLDP and BDDP fields
-    private static final byte[] LLDP_STANDARD_DST_MAC_STRING = 
+    private static final byte[] LLDP_STANDARD_DST_MAC_STRING =
             HexString.fromHexString("01:80:c2:00:00:0e");
     private static final long LINK_LOCAL_MASK  = 0xfffffffffff0L;
     private static final long LINK_LOCAL_VALUE = 0x0180c2000000L;
@@ -135,27 +135,27 @@
     private static final String LLDP_BSN_DST_MAC_STRING = "ff:ff:ff:ff:ff:ff";
 
 
-    // Direction TLVs are used to indicate if the LLDPs were sent 
+    // Direction TLVs are used to indicate if the LLDPs were sent
     // periodically or in response to a recieved LLDP
     private static final byte TLV_DIRECTION_TYPE = 0x73;
     private static final short TLV_DIRECTION_LENGTH = 1;  // 1 byte
     private static final byte TLV_DIRECTION_VALUE_FORWARD[] = {0x01};
     private static final byte TLV_DIRECTION_VALUE_REVERSE[] = {0x02};
-    private static final LLDPTLV forwardTLV 
+    private static final LLDPTLV forwardTLV
     = new LLDPTLV().
-    setType((byte)TLV_DIRECTION_TYPE).
-    setLength((short)TLV_DIRECTION_LENGTH).
+    setType(TLV_DIRECTION_TYPE).
+    setLength(TLV_DIRECTION_LENGTH).
     setValue(TLV_DIRECTION_VALUE_FORWARD);
 
-    private static final LLDPTLV reverseTLV 
+    private static final LLDPTLV reverseTLV
     = new LLDPTLV().
-    setType((byte)TLV_DIRECTION_TYPE).
-    setLength((short)TLV_DIRECTION_LENGTH).
+    setType(TLV_DIRECTION_TYPE).
+    setLength(TLV_DIRECTION_LENGTH).
     setValue(TLV_DIRECTION_VALUE_REVERSE);
 
     // Link discovery task details.
     protected SingletonTask discoveryTask;
-    protected final int DISCOVERY_TASK_INTERVAL = 1; 
+    protected final int DISCOVERY_TASK_INTERVAL = 1;
     protected final int LINK_TIMEOUT = 35; // original 35 secs, aggressive 5 secs
     protected final int LLDP_TO_ALL_INTERVAL = 15 ; //original 15 seconds, aggressive 2 secs.
     protected long lldpClock = 0;
@@ -206,7 +206,7 @@
     /* topology aware components are called in the order they were added to the
      * the array */
     protected ArrayList<ILinkDiscoveryListener> linkDiscoveryAware;
-    
+
     protected class LinkUpdate extends LDUpdate {
 
 		public LinkUpdate(LDUpdate old) {
@@ -263,7 +263,7 @@
      */
     protected Map<NodePortTuple, Long> broadcastDomainPortTimeMap;
 
-    /** 
+    /**
      * Get the LLDP sending period in seconds.
      * @return LLDP sending period in seconds.
      */
@@ -283,6 +283,7 @@
         return portLinks;
     }
 
+    @Override
     public Set<NodePortTuple> getSuppressLLDPsInfo() {
         return suppressLinkDiscovery;
     }
@@ -291,6 +292,7 @@
      * Add a switch port to the suppressed LLDP list.
      * Remove any known links on the switch port.
      */
+    @Override
     public void AddToSuppressLLDPs(long sw, short port)
     {
         NodePortTuple npt = new NodePortTuple(sw, port);
@@ -302,7 +304,8 @@
      * Remove a switch port from the suppressed LLDP list.
      * Discover links on that switchport.
      */
-    public void RemoveFromSuppressLLDPs(long sw, short port) 
+    @Override
+    public void RemoveFromSuppressLLDPs(long sw, short port)
     {
         NodePortTuple npt = new NodePortTuple(sw, port);
         this.suppressLinkDiscovery.remove(npt);
@@ -317,6 +320,7 @@
         return false;
     }
 
+    @Override
     public ILinkDiscovery.LinkType getLinkType(Link lt, LinkInfo info) {
         if (info.getUnicastValidTime() != null) {
             return ILinkDiscovery.LinkType.DIRECT_LINK;
@@ -326,7 +330,7 @@
         return ILinkDiscovery.LinkType.INVALID_LINK;
     }
 
-    
+
     private boolean isLinkDiscoverySuppressed(long sw, short portNumber) {
         return this.suppressLinkDiscovery.contains(new NodePortTuple(sw, portNumber));
     }
@@ -437,6 +441,7 @@
         }
     }
 
+    @Override
     public Set<Short> getQuarantinedPorts(long sw) {
         Set<Short> qPorts = new HashSet<Short>();
 
@@ -468,12 +473,12 @@
         else operation = UpdateOperation.PORT_DOWN;
 
         LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
-        
-        
+
+
         controller.publishUpdate(update);
     }
 
-    /** 
+    /**
      * Send LLDP on known ports
      */
     protected void discoverOnKnownLinkPorts() {
@@ -500,7 +505,7 @@
      */
     protected IOFSwitch addRemoteSwitch(long sw, short port) {
     	IOnosRemoteSwitch remotesw = null;
-    	
+
     	// add a switch if we have not seen it before
     	remotesw = remoteSwitches.get(sw);
 
@@ -510,26 +515,26 @@
         	remoteSwitches.put(remotesw.getId(), remotesw);
         	log.debug("addRemoteSwitch(): added fake remote sw {}", remotesw);
         }
-        
+
         // add the port if we have not seen it before
         if (remotesw.getPort(port) == null) {
         	OFPhysicalPort remoteport = new OFPhysicalPort();
         	remoteport.setPortNumber(port);
         	remoteport.setName("fake_" + port);
-        	remoteport.setConfig(0); 
+        	remoteport.setConfig(0);
         	remoteport.setState(0);
         	remotesw.setPort(remoteport);
         	log.debug("addRemoteSwitch(): added fake remote port {} to sw {}", remoteport, remotesw.getId());
         }
-        
+
         return remotesw;
     }
-    
+
     /**
      * Send link discovery message out of a given switch port.
      * The discovery message may be a standard LLDP or a modified
-     * LLDP, where the dst mac address is set to :ff.  
-     * 
+     * LLDP, where the dst mac address is set to :ff.
+     *
      * TODO: The modified LLDP will updated in the future and may
      * use a different eth-type.
      * @param sw
@@ -565,7 +570,7 @@
 
         if (isLinkDiscoverySuppressed(sw, port)) {
             /* Dont send LLDPs out of this port as suppressLLDPs set
-             * 
+             *
              */
             return;
         }
@@ -814,6 +819,16 @@
                 }
                 return Command.STOP;
             }
+            else if(sw <= remoteSwitch.getId()){
+                if (log.isTraceEnabled()) {
+                    log.trace("Getting BBDP from a different controller. myId {}: remoteId {}", myId, otherId);
+                    log.trace("and my controller id is smaller than the other, so quelching it. myPort {}: rPort {}", pi.getInPort(), remotePort);
+                  }
+                  //XXX ONOS: Fix the BDDP broadcast issue
+                  //return Command.CONTINUE;
+                  return Command.STOP;
+            }
+            /*
             else if (myId < otherId)  {
                 if (log.isTraceEnabled()) {
                     log.trace("Getting BDDP packets from a different controller" +
@@ -823,6 +838,7 @@
                 //return Command.CONTINUE;
                 return Command.STOP;
             }
+            */
         }
 
 
@@ -881,9 +897,9 @@
 
         addOrUpdateLink(lt, newLinkInfo);
 
-        // Check if reverse link exists. 
-        // If it doesn't exist and if the forward link was seen 
-        // first seen within a small interval, send probe on the 
+        // Check if reverse link exists.
+        // If it doesn't exist and if the forward link was seen
+        // first seen within a small interval, send probe on the
         // reverse link.
 
         newLinkInfo = links.get(lt);
@@ -927,8 +943,8 @@
 
     protected Command handlePacketIn(long sw, OFPacketIn pi,
                                      FloodlightContext cntx) {
-        Ethernet eth = 
-                IFloodlightProviderService.bcStore.get(cntx, 
+        Ethernet eth =
+                IFloodlightProviderService.bcStore.get(cntx,
                                                        IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
 
         if(eth.getEtherType() == Ethernet.TYPE_BSN) {
@@ -999,8 +1015,8 @@
                 newInfo.setFirstSeenTime(oldInfo.getFirstSeenTime());
 
             if (log.isTraceEnabled()) {
-                log.trace("addOrUpdateLink: {} {}", 
-                          lt, 
+                log.trace("addOrUpdateLink: {} {}",
+                          lt,
                           (newInfo.getMulticastValidTime()!=null) ? "multicast" : "unicast");
             }
 
@@ -1033,7 +1049,7 @@
                 // Add to portNOFLinks if the unicast valid time is null
                 if (newInfo.getUnicastValidTime() == null)
                     addLinkToBroadcastDomain(lt);
-                
+
                 // ONOS: Distinguish added event separately from updated event
                 updateOperation = UpdateOperation.LINK_ADDED;
                 linkChanged = true;
@@ -1119,6 +1135,7 @@
         return linkChanged;
     }
 
+    @Override
     public Map<Long, Set<Link>> getSwitchLinks() {
         return this.switchLinks;
     }
@@ -1198,7 +1215,7 @@
 
         // ONOS: If we do not control this switch, then we should not process its port status messages
         if (!registryService.hasControl(iofSwitch.getId())) return Command.CONTINUE;
-        
+
         if (log.isTraceEnabled()) {
             log.trace("handlePortStatus: Switch {} port #{} reason {}; " +
                     "config is {} state is {}",
@@ -1225,7 +1242,7 @@
                 LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, UpdateOperation.PORT_DOWN));
                 controller.publishUpdate(update);
                 linkDeleted = true;
-                } 
+                }
             else if (ps.getReason() ==
                     (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
                 // If ps is a port modification and the port state has changed
@@ -1237,7 +1254,7 @@
                         assert(linkInfo != null);
                         Integer updatedSrcPortState = null;
                         Integer updatedDstPortState = null;
-                        if (lt.getSrc() == npt.getNodeId() && 
+                        if (lt.getSrc() == npt.getNodeId() &&
                                 lt.getSrcPort() == npt.getPortId() &&
                                 (linkInfo.getSrcPortState() !=
                                 ps.getDesc().getState())) {
@@ -1264,7 +1281,7 @@
                                                      getLinkType(lt, linkInfo),
                                                      operation));
                             controller.publishUpdate(update);
-                            
+
                             linkInfoChanged = true;
                         }
                     }
@@ -1378,9 +1395,9 @@
             lock.writeLock().unlock();
         }
     }
-    
+
     /**
-     * We don't react the port changed notifications here. we listen for 
+     * We don't react the port changed notifications here. we listen for
      * OFPortStatus messages directly. Might consider using this notifier
      * instead
      */
@@ -1389,7 +1406,7 @@
         // no-op
     }
 
-    /** 
+    /**
      * Delete links incident on a given switch port.
      * @param npt
      * @param reason
@@ -1409,7 +1426,7 @@
         }
     }
 
-    /** 
+    /**
      * Iterates through the list of links and deletes if the
      * last discovery message reception time exceeds timeout values.
      */
@@ -1430,7 +1447,7 @@
 
                 // Timeout the unicast and multicast LLDP valid times
                 // independently.
-                if ((info.getUnicastValidTime() != null) && 
+                if ((info.getUnicastValidTime() != null) &&
                         (info.getUnicastValidTime() + (this.LINK_TIMEOUT * 1000) < curTime)){
                     info.setUnicastValidTime(null);
 
@@ -1440,7 +1457,7 @@
                     // the link would be deleted, which would trigger updateClusters().
                     linkChanged = true;
                 }
-                if ((info.getMulticastValidTime()!= null) && 
+                if ((info.getMulticastValidTime()!= null) &&
                         (info.getMulticastValidTime()+ (this.LINK_TIMEOUT * 1000) < curTime)) {
                     info.setMulticastValidTime(null);
                     // if uTime is not null, then link will remain as openflow
@@ -1451,7 +1468,7 @@
                 }
                 // Add to the erase list only if the unicast
                 // time is null.
-                if (info.getUnicastValidTime() == null && 
+                if (info.getUnicastValidTime() == null &&
                         info.getMulticastValidTime() == null){
                     eraseList.add(entry.getKey());
                 } else if (linkChanged) {
@@ -1510,11 +1527,11 @@
         srcNpt = new NodePortTuple(lt.getSrc(), lt.getSrcPort());
         dstNpt = new NodePortTuple(lt.getDst(), lt.getDstPort());
 
-        if (!portBroadcastDomainLinks.containsKey(lt.getSrc()))
+        if (!portBroadcastDomainLinks.containsKey(srcNpt))
             portBroadcastDomainLinks.put(srcNpt, new HashSet<Link>());
         portBroadcastDomainLinks.get(srcNpt).add(lt);
 
-        if (!portBroadcastDomainLinks.containsKey(lt.getDst()))
+        if (!portBroadcastDomainLinks.containsKey(dstNpt))
             portBroadcastDomainLinks.put(dstNpt, new HashSet<Link>());
         portBroadcastDomainLinks.get(dstNpt).add(lt);
     }
@@ -1575,7 +1592,7 @@
 
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
                 new ArrayList<Class<? extends IFloodlightService>>();
         l.add(ILinkDiscoveryService.class);
         //l.add(ITopologyService.class);
@@ -1586,7 +1603,7 @@
     public Map<Class<? extends IFloodlightService>, IFloodlightService>
     getServiceImpls() {
         Map<Class<? extends IFloodlightService>,
-        IFloodlightService> m = 
+        IFloodlightService> m =
         new HashMap<Class<? extends IFloodlightService>,
         IFloodlightService>();
         // We are the class that implements the service
@@ -1596,7 +1613,7 @@
 
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
                 new ArrayList<Class<? extends IFloodlightService>>();
         l.add(IFloodlightProviderService.class);
         l.add(IThreadPoolService.class);
@@ -1679,7 +1696,7 @@
                     log.error("Exception in LLDP send timer.", e);
                 } finally {
                     if (!shuttingDown) {
-                    	// Always reschedule link discovery if we're not 
+                    	// Always reschedule link discovery if we're not
                     	// shutting down (no chance of SLAVE role now)
                         log.trace("Rescheduling discovery task");
                         discoveryTask.reschedule(DISCOVERY_TASK_INTERVAL,
@@ -1729,7 +1746,7 @@
         if ((sw.getChannel() != null) &&
                 (SocketAddress.class.isInstance(
                                                 sw.getChannel().getRemoteAddress()))) {
-            evTopoSwitch.ipv4Addr = 
+            evTopoSwitch.ipv4Addr =
                     IPv4.toIPv4Address(((InetSocketAddress)(sw.getChannel().
                             getRemoteAddress())).getAddress().getAddress());
             evTopoSwitch.l4Port   =
@@ -1787,10 +1804,12 @@
         evTopoCluster = evHistTopologyCluster.put(evTopoCluster, action);
     }
 
+    @Override
     public boolean isAutoPortFastFeature() {
         return autoPortFastFeature;
     }
 
+    @Override
     public void setAutoPortFastFeature(boolean autoPortFastFeature) {
         this.autoPortFastFeature = autoPortFastFeature;
     }