Merge pull request #520 from jonohart/arprefactor

Refactored the notification mechanism for getting other instances to send packet-outs.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 33091b9..7677ee8 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -18,8 +18,10 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.onrc.onos.datagrid.web.DatagridWebRoutable;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
-import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
-import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
+import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -78,12 +80,18 @@
     private IMap<String, byte[]> mapTopology = null;
     private MapTopologyListener mapTopologyListener = null;
     private String mapTopologyListenerId = null;
+    
+    // State related to the packet out map
+    protected static final String packetOutMapName = "packetOutMap";
+    private IMap<PacketOutNotification, byte[]> packetOutMap = null;
+    private List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<IPacketOutEventHandler>();
 
-    // State related to the ARP map
-    protected static final String arpMapName = "arpMap";
-    private IMap<ArpMessage, byte[]> arpMap = null;
-    private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
     private final byte[] dummyByte = {0};
+    
+    // State related to the ARP reply map
+    protected static final String arpReplyMapName = "arpReplyMap";
+    private IMap<ArpReplyNotification, byte[]> arpReplyMap = null;
+    private List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<IArpReplyEventHandler>();
 
     /**
      * Class for receiving notifications for Flow state.
@@ -317,35 +325,22 @@
     }
 
     /**
-     * Class for receiving notifications for ARP requests.
+     * Class for receiving notifications for sending packet-outs.
      *
      * The datagrid map is:
-     *  - Key: Request ID (String)
-     *  - Value: ARP request packet (byte[])
+     *  - Key: Packet-out to send (PacketOutNotification)
+     *  - Value: dummy value (we only need the key) (byte[])
      */
-    class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
+    class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
 		/**
 		 * Receive a notification that an entry is added.
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		@Override
-		public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
-		    for (IArpEventHandler arpEventHandler : arpEventHandlers) {
-		    	arpEventHandler.arpRequestNotification(event.getKey());
+		public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
+		    for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
+		    	packetOutEventHandler.packetOutNotification(event.getKey());
 		    }
-
-		    //
-		    // Decode the value and deliver the notification
-		    //
-		    /*
-		    Kryo kryo = kryoFactory.newKryo();
-		    Input input = new Input(valueBytes);
-		    TopologyElement topologyElement =
-			kryo.readObject(input, TopologyElement.class);
-		    kryoFactory.deleteKryo(kryo);
-		    flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
-		    */
 		}
 
 		/**
@@ -353,8 +348,7 @@
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		@Override
-		public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
+		public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
 			// Not used
 		}
 
@@ -363,8 +357,7 @@
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		@Override
-		public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
+		public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
 			// Not used
 		}
 
@@ -373,11 +366,35 @@
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		@Override
-		public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
+		public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
 		    // Not used
 		}
     }
+    
+    /**
+     * Class for receiving notifications for sending packet-outs.
+     *
+     * The datagrid map is:
+     *  - Key: Packet-out to send (PacketOutNotification)
+     *  - Value: dummy value (we only need the key) (byte[])
+     */
+    class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
+		/**
+		 * Receive a notification that an entry is added.
+		 *
+		 * @param event the notification event for the entry.
+		 */
+		public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
+		    for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
+		    	arpReplyEventHandler.arpReplyEvent(event.getKey());
+		    }
+		}
+		
+		// These methods aren't used for ARP replies
+		public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {}
+		public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {}
+		public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {}
+    }
 
     /**
      * Initialize the Hazelcast Datagrid operation.
@@ -494,9 +511,12 @@
 	hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
 
 	restApi.addRestletRoutable(new DatagridWebRoutable());
-
-	arpMap = hazelcastInstance.getMap(arpMapName);
-	arpMap.addEntryListener(new ArpMapListener(), true);
+	
+	packetOutMap = hazelcastInstance.getMap(packetOutMapName);
+	packetOutMap.addEntryListener(new PacketOutMapListener(), true);
+	
+	arpReplyMap = hazelcastInstance.getMap(arpReplyMapName);
+	arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
     }
 
     /**
@@ -557,15 +577,27 @@
     }
 
     @Override
-    public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
+    public void registerPacketOutEventHandler(IPacketOutEventHandler arpEventHandler) {
     	if (arpEventHandler != null) {
-    		arpEventHandlers.add(arpEventHandler);
+    		packetOutEventHandlers.add(arpEventHandler);
+    	}
+    }
+    
+    @Override
+    public void deregisterPacketOutEventHandler(IPacketOutEventHandler arpEventHandler) {
+    	packetOutEventHandlers.remove(arpEventHandler);
+    }
+    
+    @Override
+    public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
+    	if (arpReplyEventHandler != null) {
+    		arpReplyEventHandlers.add(arpReplyEventHandler);
     	}
     }
 
     @Override
-    public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
-    	arpEventHandlers.remove(arpEventHandler);
+    public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
+    	arpReplyEventHandlers.remove(arpReplyEventHandler);
     }
 
     /**
@@ -902,8 +934,12 @@
     }
 
     @Override
-    public void sendArpRequest(ArpMessage arpMessage) {
-    	//log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
-     	arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
+    public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
+     	packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
     }
+
+	@Override
+	public void sendArpReplyNotification(ArpReplyNotification arpReply) {
+		arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
+	}
 }
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 0f03d77..90fe57c 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -4,8 +4,10 @@
 
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
-import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
-import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
+import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -42,14 +44,18 @@
      * 
      * @param arpEventHandler The ARP event handler to register.
      */
-    public void registerArpEventHandler(IArpEventHandler arpEventHandler);
+    public void registerPacketOutEventHandler(IPacketOutEventHandler arpEventHandler);
     
     /**
      * De-register event handler service for ARP events.
      * 
      * @param arpEventHandler The ARP event handler to de-register.
      */
-    public void deregisterArpEventHandler(IArpEventHandler arpEventHandler);
+    public void deregisterPacketOutEventHandler(IPacketOutEventHandler arpEventHandler);
+    
+    public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler);
+    
+    public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler);
 
     /**
      * Get all Flows that are currently in the datagrid.
@@ -167,8 +173,21 @@
     void notificationSendAllTopologyElementsRemoved();
     
     /**
-     * Send an ARP request to other ONOS instances
-     * @param arpRequest The request packet to send
+     * Send a packet-out notification to other ONOS instances. This informs
+     * other instances that they should send this packet out some of the ports
+     * they control. Not all notifications are applicable to all instances 
+     * (i.e. some notifications specify a single port to send the packet out),
+     * so each instance must determine whether it needs to take action when it
+     * receives the notification.
+     * 
+     * @param packetOutNotification The packet notification to send
      */
-    public void sendArpRequest(ArpMessage arpMessage);  
+    public void sendPacketOutNotification(PacketOutNotification packetOutNotification);
+    
+    /**
+     * Send notification to other ONOS instances that an ARP reply has been 
+     * received.
+     * @param arpReply The notification of the ARP reply
+     */
+    public void sendArpReplyNotification(ArpReplyNotification arpReply);
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
index 33280a6..d16b8b4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
@@ -41,7 +41,6 @@
 import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryService;
 import net.onrc.onos.ofcontroller.proxyarp.BgpProxyArpManager;
 import net.onrc.onos.ofcontroller.proxyarp.IArpRequester;
-import net.onrc.onos.ofcontroller.proxyarp.IProxyArpService;
 import net.onrc.onos.ofcontroller.topology.ITopologyNetService;
 import net.onrc.onos.ofcontroller.topology.Topology;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
@@ -78,8 +77,7 @@
 
 public class BgpRoute implements IFloodlightModule, IBgpRouteService, 
 									ITopologyListener, IArpRequester,
-									IOFSwitchListener, IConfigInfoService,
-									IProxyArpService {
+									IOFSwitchListener, IConfigInfoService {
 	
 	private final static Logger log = LoggerFactory.getLogger(BgpRoute.class);
 
@@ -1287,27 +1285,4 @@
 	public short getVlan() {
 		return vlan;
 	}
-
-	/*
-	 * TODO This is a hack to get the REST API to work for ProxyArpManager.
-	 * The REST API is currently tied to the Floodlight module system and we
-	 * need to separate it to allow ONOS modules to use it. For now we will 
-	 * proxy calls through to the ProxyArpManager (which is not a Floodlight 
-	 * module) through this class which is a module.
-	 */
-	@Override
-	public MACAddress getMacAddress(InetAddress ipAddress) {
-		return proxyArp.getMacAddress(ipAddress);
-	}
-
-	@Override
-	public void sendArpRequest(InetAddress ipAddress, IArpRequester requester,
-			boolean retry) {
-		proxyArp.sendArpRequest(ipAddress, requester, retry);		
-	}
-
-	@Override
-	public List<String> getMappings() {
-		return proxyArp.getMappings();
-	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
index ea547fc..68c2b7c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
@@ -20,6 +20,7 @@
 import net.floodlightcontroller.devicemanager.IDeviceListener;
 import net.floodlightcontroller.routing.Link;
 import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.graph.GraphDBConnection;
 import net.onrc.onos.graph.GraphDBOperation;
@@ -37,7 +38,7 @@
 import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryListener;
 import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryService;
 import net.onrc.onos.ofcontroller.linkdiscovery.LinkInfo;
-import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
+import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.registry.controller.IControllerRegistryService;
 import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
@@ -384,8 +385,9 @@
 		log.debug("{}:deviceAdded(): Adding device {}",this.getClass(),device.getMACAddressString());
 		devStore.addDevice(device);
 		for (int intIpv4Address : device.getIPv4Addresses()) {
-			datagridService.sendArpRequest(
-					ArpMessage.newReply(InetAddresses.fromInteger(intIpv4Address)));
+			datagridService.sendArpReplyNotification(new ArpReplyNotification(
+					InetAddresses.fromInteger(intIpv4Address), 
+					MACAddress.valueOf(device.getMACAddress())));
 		}
 	}
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 8ca3989..6d953f1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -1,6 +1,5 @@
 package net.onrc.onos.ofcontroller.forwarding;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -16,7 +15,6 @@
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.packet.Ethernet;
-import net.floodlightcontroller.packet.IPv4;
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.ofcontroller.core.IDeviceStorage;
@@ -27,7 +25,8 @@
 import net.onrc.onos.ofcontroller.devicemanager.IOnosDeviceService;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
 import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
-import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
+import net.onrc.onos.ofcontroller.proxyarp.IProxyArpService;
+import net.onrc.onos.ofcontroller.proxyarp.BroadcastPacketOutNotification;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
 import net.onrc.onos.ofcontroller.util.CallerId;
 import net.onrc.onos.ofcontroller.util.DataPath;
@@ -53,7 +52,6 @@
 
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
-import com.google.common.net.InetAddresses;
 
 public class Forwarding implements IOFMessageListener, IFloodlightModule,
 									IForwardingService {
@@ -175,6 +173,9 @@
 		dependencies.add(IFlowService.class);
 		dependencies.add(IFlowPusherService.class);
 		dependencies.add(IOnosDeviceService.class);
+		// We don't use the IProxyArpService directly, but reactive forwarding
+		// requires it to be loaded and answering ARP requests
+		dependencies.add(IProxyArpService.class);
 		return dependencies;
 	}
 	
@@ -256,24 +257,9 @@
 		if (log.isTraceEnabled()) {
 			log.trace("Sending broadcast packet to other ONOS instances");
 		}
-		
-		IPv4 ipv4Packet = (IPv4) eth.getPayload();
-		
-		// TODO We'll put the destination address here, because the current
-		// architecture needs an address. Addresses are only used for replies
-		// however, which don't apply to non-ARP packets. The ArpMessage class
-		// has become a bit too overloaded and should be refactored to 
-		// handle all use cases nicely.
-		 InetAddress targetAddress = 
-				InetAddresses.fromInteger(ipv4Packet.getDestinationAddress());
-		
-		// Piggy-back on the ARP mechanism to broadcast this packet out the
-		// edge. Luckily the ARP module doesn't check that the packet is
-		// actually ARP before broadcasting, so we can trick it into sending
-		// our non-ARP packets.
-		// TODO This should be refactored later to account for the new use case.
-		datagrid.sendArpRequest(ArpMessage.newRequest(targetAddress, eth.serialize(),
-				-1L, (short)-1, sw.getId(), pi.getInPort()));
+
+		 datagrid.sendPacketOutNotification(new BroadcastPacketOutNotification(
+				 eth.serialize(), sw.getId(), pi.getInPort()));
 	}
 	
 	private void handlePacketIn(IOFSwitch sw, OFPacketIn pi, Ethernet eth) {
@@ -303,7 +289,6 @@
 		long destinationDpid = HexString.toLong(switchObject.getDPID());
 		
 		// TODO SwitchPort, Dpid and Port should probably be immutable
-		// (also, are Dpid and Port are even necessary?)
 		SwitchPort srcSwitchPort = new SwitchPort(
 				new Dpid(sw.getId()), new Port(pi.getInPort())); 
 		SwitchPort dstSwitchPort = new SwitchPort(
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpReplyNotification.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpReplyNotification.java
new file mode 100644
index 0000000..a8afc55
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpReplyNotification.java
@@ -0,0 +1,28 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+import net.floodlightcontroller.util.MACAddress;
+
+public class ArpReplyNotification implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	private InetAddress targetAddress;
+	private MACAddress targetMacAddress;
+	
+	public ArpReplyNotification(InetAddress targetAddress, MACAddress targetMacAddress) {
+		this.targetAddress = targetAddress;
+		this.targetMacAddress = targetMacAddress;
+	}
+
+	public InetAddress getTargetAddress() {
+		return targetAddress;
+	}
+
+	public MACAddress getTargetMacAddress() {
+		return targetMacAddress;
+	}
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/BroadcastPacketOutNotification.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/BroadcastPacketOutNotification.java
new file mode 100644
index 0000000..73d2163
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/BroadcastPacketOutNotification.java
@@ -0,0 +1,34 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+/**
+ * Notification to all ONOS instances to broadcast this packet out the edge of
+ * the network. The edge is defined as any port that doesn't have a link to
+ * another switch. The one exception is the port that the packet was received
+ * on.
+ *
+ */
+public class BroadcastPacketOutNotification extends
+		PacketOutNotification {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private final long inSwitch;
+	private final short inPort;
+
+	public BroadcastPacketOutNotification(byte[] packet, long inSwitch, 
+			short inPort) {
+		super(packet);
+		
+		this.inSwitch = inSwitch;
+		this.inPort = inPort;
+	}
+
+	public long getInSwitch() {
+		return inSwitch;
+	}
+
+	public short getInPort() {
+		return inPort;
+	}
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
deleted file mode 100644
index 4ec32ec..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package net.onrc.onos.ofcontroller.proxyarp;
-
-public interface IArpEventHandler {
-
-	/**
-	 * Notify the ARP event handler that an ARP request has been received.
-	 * @param id The string ID of the ARP request
-	 * @param arpRequest The ARP request packet
-	 */
-	public void arpRequestNotification(ArpMessage arpMessage);
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java
new file mode 100644
index 0000000..75f1d5d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java
@@ -0,0 +1,5 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+public interface IArpReplyEventHandler {
+	public void arpReplyEvent(ArpReplyNotification arpReply);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java
new file mode 100644
index 0000000..86b3728
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java
@@ -0,0 +1,18 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+/**
+ * Classes may implement this interface if they wish to subscribe to 
+ * packet out notifications from the datagrid service. Packet out notifications
+ * are used to direct other ONOS instances to send packets out particular
+ * ports under their control.
+ *
+ */
+public interface IPacketOutEventHandler {
+
+	/**
+	 * Notify the packet out event handler that an packet out notification has
+	 * been received.
+	 * @param packetOutNotification An object describing the notification
+	 */
+	public void packetOutNotification(PacketOutNotification packetOutNotification);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/PacketOutNotification.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/PacketOutNotification.java
new file mode 100644
index 0000000..3d37d25
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/PacketOutNotification.java
@@ -0,0 +1,21 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+import java.io.Serializable;
+
+/**
+ * A PacketOutNotification contains data sent between ONOS instances that
+ * directs other instances to send a packet out a set of ports.
+ * This is an abstract base class that will be subclassed by specific
+ * types of notifications.
+ *
+ */
+public abstract class PacketOutNotification implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	protected final byte[] packet;
+
+	public PacketOutNotification(byte[] packet) {
+		this.packet = packet;
+	}
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index 289e0e2..70533b0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -37,6 +37,7 @@
 import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
 import net.onrc.onos.ofcontroller.core.internal.DeviceStorageImpl;
 import net.onrc.onos.ofcontroller.core.internal.TopoSwitchServiceImpl;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
 import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.Port;
 import net.onrc.onos.ofcontroller.util.SwitchPort;
@@ -58,7 +59,8 @@
 import com.google.common.net.InetAddresses;
 
 public class ProxyArpManager implements IProxyArpService, IOFMessageListener,
-										IArpEventHandler, IFloodlightModule {
+										IPacketOutEventHandler, IArpReplyEventHandler, 
+										IFloodlightModule {
 	private final static Logger log = LoggerFactory.getLogger(ProxyArpManager.class);
 	
 	private final long ARP_TIMER_PERIOD = 100; //ms  
@@ -70,6 +72,7 @@
 	private IDatagridService datagrid;
 	private IConfigInfoService configService;
 	private IRestApiService restApi;
+	private IFlowPusherService flowPusher;
 	
 	private IDeviceStorage deviceStorage;
 	private volatile ITopoSwitchService topoSwitchService;
@@ -153,6 +156,7 @@
 		dependencies.add(IRestApiService.class);
 		dependencies.add(IDatagridService.class);
 		dependencies.add(IConfigInfoService.class);
+		dependencies.add(IFlowPusherService.class);
 		return dependencies;
 	}
 	
@@ -164,6 +168,7 @@
 		this.datagrid = context.getServiceImpl(IDatagridService.class);
 		this.configService = context.getServiceImpl(IConfigInfoService.class);
 		this.restApi = context.getServiceImpl(IRestApiService.class);
+		this.flowPusher = context.getServiceImpl(IFlowPusherService.class);
 		
 		//arpCache = new ArpCache();
 
@@ -181,7 +186,7 @@
 		restApi.addRestletRoutable(new ArpWebRoutable());
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 		
-		datagrid.registerArpEventHandler(this);
+		datagrid.registerPacketOutEventHandler(this);
 		
 		deviceStorage = new DeviceStorageImpl();
 		deviceStorage.init("");
@@ -291,7 +296,7 @@
 			}
 			else if (arp.getOpCode() == ARP.OP_REPLY) {
 				handleArpReply(sw, pi, arp);
-				sendToOtherNodesReply(eth, pi);
+				sendReplyNotification(eth, pi);
 			}
 			
 			// Stop ARP packets here
@@ -345,7 +350,9 @@
 			}
 			
 			// We don't know the device so broadcast the request out
-			sendToOtherNodes(eth, sw.getId(), pi);
+			datagrid.sendPacketOutNotification(
+					new BroadcastPacketOutNotification(eth.serialize(), 
+							sw.getId(), pi.getInPort()));
 		}
 		else {
 			// Even if the device exists in our database, we do not reply to
@@ -362,7 +369,6 @@
 
 			// sendArpReply(arp, sw.getId(), pi.getInPort(), macAddress);
 
-			log.trace("Checking the device info from DB is still valid or not");
 			Iterable<IPortObject> outPorts = targetDevice.getAttachedPorts();	
 
 			if (!outPorts.iterator().hasNext()){
@@ -371,19 +377,26 @@
 							" - broadcasting", macAddress);
 				}
 				
-				sendToOtherNodes(eth, sw.getId(), pi);
+				datagrid.sendPacketOutNotification(
+						new BroadcastPacketOutNotification(eth.serialize(), 
+								sw.getId(), pi.getInPort()));
 			} 
 			else {
 				for (IPortObject portObject : outPorts) {
-					long outSwitch = 0;
-					short outPort = 0;
+					//long outSwitch = 0;
+					//short outPort = 0;
 
+					/*
 					if (!portObject.getLinkedPorts().iterator().hasNext()) {
 						outPort = portObject.getNumber();					
+					}*/
+					if (portObject.getLinkedPorts().iterator().hasNext()) {
+						continue;
 					}
 
+					short outPort = portObject.getNumber();
 					ISwitchObject outSwitchObject = portObject.getSwitch();
-					outSwitch = HexString.toLong(outSwitchObject.getDPID());
+					long outSwitch = HexString.toLong(outSwitchObject.getDPID());
 
 					if (log.isTraceEnabled()) {
 						log.trace("Probing device {} on port {}/{}", 
@@ -391,7 +404,9 @@
 								HexString.toHexString(outSwitch), outPort});
 					}
 					
-					sendToOtherNodes(eth, pi, outSwitch, outPort);
+					datagrid.sendPacketOutNotification(
+							new SinglePacketOutNotification(eth.serialize(), 
+									outSwitch, outPort));
 				}
 			}
 		}
@@ -517,50 +532,7 @@
 		}
 	}
 	
-	private void sendToOtherNodes(Ethernet eth, long inSwitchId, OFPacketIn pi) {
-		ARP arp = (ARP) eth.getPayload();
-		
-		if (log.isTraceEnabled()) {
-			log.trace("Sending ARP request for {} to other ONOS instances",
-					inetAddressToString(arp.getTargetProtocolAddress()));
-		}
-		
-		InetAddress targetAddress;
-		try {
-			targetAddress = InetAddress.getByAddress(arp.getTargetProtocolAddress());
-		} catch (UnknownHostException e) {
-			log.error("Unknown host", e);
-			return;
-		}
-		
-		datagrid.sendArpRequest(ArpMessage.newRequest(targetAddress, eth.serialize(),
-				-1L, (short)-1, inSwitchId, pi.getInPort()));
-	}
-	
-	//hazelcast to other ONOS instances to send the ARP packet out on outPort of outSwitch
-	private void sendToOtherNodes(Ethernet eth, OFPacketIn pi, long outSwitch, short outPort) {
-		ARP arp = (ARP) eth.getPayload();
-		
-		if (log.isTraceEnabled()) {
-			log.trace("Sending ARP request for {} to other ONOS instances with outSwitch {} ",
-					inetAddressToString(arp.getTargetProtocolAddress()), String.valueOf(outSwitch));
-		}
-		
-		InetAddress targetAddress;
-		try {
-			targetAddress = InetAddress.getByAddress(arp.getTargetProtocolAddress());
-		} catch (UnknownHostException e) {
-			log.error("Unknown host", e);
-			return;
-		}
-		
-		datagrid.sendArpRequest(ArpMessage.newRequest(targetAddress, eth.serialize(), outSwitch, outPort)); 
-		//datagrid.sendArpRequest(ArpMessage.newRequest(targetAddress, eth.serialize()));
-		
-		
-	}
-	
-	private void sendToOtherNodesReply(Ethernet eth, OFPacketIn pi) {
+	private void sendReplyNotification(Ethernet eth, OFPacketIn pi) {
 		ARP arp = (ARP) eth.getPayload();
 		
 		if (log.isTraceEnabled()) {
@@ -577,12 +549,14 @@
 			log.error("Unknown host", e);
 			return;
 		}
-		
-		datagrid.sendArpRequest(ArpMessage.newReply(targetAddress, mac));
-		//datagrid.sendArpReply(ArpMessage.newRequest(targetAddress, eth.serialize()));
-	
+
+		datagrid.sendArpReplyNotification(new ArpReplyNotification(targetAddress, mac));
 	}
 	
+	// This remains from the older single-instance ARP code. It used Floodlight
+	// APIs to find the edge of the network, but only worked on a single instance.
+	// We now do this using ONOS network graph APIs.
+	@Deprecated
 	private void broadcastArpRequestOutEdge(byte[] arpRequest, long inSwitch, short inPort) {
 		for (IOFSwitch sw : floodlightProvider.getSwitches().values()){
 			Collection<Short> enabledPorts = sw.getEnabledPortNumbers();
@@ -673,12 +647,7 @@
 			po.setLengthU(OFPacketOut.MINIMUM_LENGTH + actionsLength 
 					+ arpRequest.length);
 			
-			try {
-				sw.write(po, null);
-				sw.flush();
-			} catch (IOException e) {
-				log.error("Failure writing packet out to switch", e);
-			}
+			flowPusher.add(sw, po);
 		}
 		
 		if (log.isTraceEnabled()) {
@@ -712,12 +681,7 @@
 			return;
 		}
 		
-		try {
-			sw.write(po, null);
-			sw.flush();
-		} catch (IOException e) {
-			log.error("Failure writing packet out to switch", e);
-		}
+		flowPusher.add(sw, po);
 	}
 	
 	private void sendArpReply(ARP arpRequest, long dpid, short port, MACAddress targetMac) {
@@ -740,7 +704,6 @@
 			.setTargetProtocolAddress(arpRequest.getSenderProtocolAddress());
 		
 
-		
 		Ethernet eth = new Ethernet();
 		eth.setDestinationMACAddress(arpRequest.getSenderHardwareAddress())
 			.setSourceMACAddress(targetMac.toBytes())
@@ -775,12 +738,7 @@
 			return;
 		}
 		
-		try {
-			sw.write(msgList, null);
-			sw.flush();
-		} catch (IOException e) {
-			log.error("Failure writing packet out to switch", e);
-		}
+		flowPusher.add(sw, po);
 	}
 	
 	private String inetAddressToString(byte[] bytes) {
@@ -820,9 +778,6 @@
 	}
 
 	/*
-	 * IArpEventHandler methods
-	 */
-	
 	@Override
 	public void arpRequestNotification(ArpMessage arpMessage) {
 		log.debug("Received ARP notification from other instances");
@@ -844,6 +799,7 @@
 			break;
 		}
 	}
+	*/
 	
 	private void sendArpReplyToWaitingRequesters(InetAddress address, MACAddress mac) {
 		log.debug("Sending ARP reply for {} to requesters", 
@@ -876,4 +832,33 @@
 			request.dispatchReply(address, mac);
 		}
 	}
+
+	@Override
+	public void arpReplyEvent(ArpReplyNotification arpReply) {
+		log.debug("Received ARP reply notification for {}",
+				arpReply.getTargetAddress());
+		sendArpReplyToWaitingRequesters(arpReply.getTargetAddress(), 
+				arpReply.getTargetMacAddress());
+	}
+
+	@Override
+	public void packetOutNotification(
+			PacketOutNotification packetOutNotification) {
+		
+		if (packetOutNotification instanceof SinglePacketOutNotification) {
+			SinglePacketOutNotification notification = 
+					(SinglePacketOutNotification) packetOutNotification;
+			sendArpRequestOutPort(notification.packet, notification.getOutSwitch(), 
+					notification.getOutPort());
+		}
+		else if (packetOutNotification instanceof BroadcastPacketOutNotification) {
+			BroadcastPacketOutNotification notification = 
+					(BroadcastPacketOutNotification) packetOutNotification;
+			broadcastArpRequestOutMyEdge(notification.packet, 
+					notification.getInSwitch(), notification.getInPort());
+		}
+		else {
+			log.warn("Unknown packet out notification received");
+		}
+	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/SinglePacketOutNotification.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/SinglePacketOutNotification.java
new file mode 100644
index 0000000..1919d87
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/SinglePacketOutNotification.java
@@ -0,0 +1,30 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+/**
+ * Notification to another ONOS instance to send a packet out a single port.
+ *
+ */
+public class SinglePacketOutNotification extends PacketOutNotification {
+
+	private static final long serialVersionUID = 1L;
+	
+	private final long outSwitch;
+	private final short outPort;
+	
+	public SinglePacketOutNotification(byte[] packet, long outSwitch, 
+			short outPort) {
+		super(packet);
+		
+		this.outSwitch = outSwitch;
+		this.outPort = outPort;
+	}
+
+	public long getOutSwitch() {
+		return outSwitch;
+	}
+
+	public short getOutPort() {
+		return outPort;
+	}
+
+}