Squashed commit of the following:

commit b905df2f2634c892e6cf322efe59c334d9fe61a7
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Mon Nov 18 17:00:25 2013 -0800

    Added FlowSync flag to FlowProgrammer.

commit 10e0856a42c855b3823efc67e90786add29e19ab
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Mon Nov 18 10:55:23 2013 -0800

    Added FlowPusher enabling flag (false by default).

commit dab231cc977eb013b344dc886fd0b543148f05ca
Author: Brian O'Connor <bocon@onlab.us>
Date:   Thu Nov 14 21:11:50 2013 -0800

    reorganized and cleaned up FlowSynchronizer

commit 1c381e099f286ac27023b22c2f534d713d493c6b
Author: Brian O'Connor <bocon@onlab.us>
Date:   Thu Nov 14 18:41:48 2013 -0800

    created FlowProgrammer module and implemented FlowPusher and FlowSynchronizer as services, also modified FlowManager to use the service exported by FlowProgrammer

commit ddbc870fc089f72765cf8abee3c79faf226616d5
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Wed Nov 13 10:43:33 2013 -0800

    Limited number of messages sent at once

commit 7d74dea34013611f159833ecfd4a7ad95307f9d8
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Tue Nov 12 17:58:34 2013 -0800

    Separated flowprogrammer package from flowmanager.

commit c11fda8d1c1e058ededf5f602e5ed3faefdb3e36
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Tue Nov 12 14:59:43 2013 -0800

    Made FlowPusher multi-threaded.

commit 21b7c1fa8c2d2c0459407510bbd4d224ee9ca804
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Tue Nov 12 11:21:36 2013 -0800

    Added FlowPath/FlowEntry version interfaces to FlowPusher

commit ec4c43da51a3807defba115148e886a4b31eb8c8
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Mon Nov 11 15:51:17 2013 -0800

    Finished implementing FlowPusherService.

commit 267cfaafbf4d7d7ac5ec82bbce076d8616d552a8
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Tue Nov 5 15:52:38 2013 -0800

    Modified FlowManager to use FlowPusher instead of OFMessageDamper and FlowSwitchOperation.

commit 0b8f3a6cf6d27f0986770b1b93935dfdfee2f38c
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Tue Nov 5 10:18:12 2013 -0800

    Refactored FlowPusher

commit 73c1690e626b61945ebbe166b3fc8af259348ac6
Author: Brian O'Connor <bocon@onlab.us>
Date:   Tue Nov 5 15:26:19 2013 -0800

    removing FlowSync from Floodlight modules

commit e9f5881a0a5ab1b1f0a0b93706d282ee420b71e6
Author: Brian O'Connor <bocon@onlab.us>
Date:   Tue Nov 5 15:17:44 2013 -0800

    incremental commit before changing origins; moving FlowSync to service from module, added helper code to construct and send OF messages

commit 669cb73862ede66dcb8cd3a35cf3c5e6d0903172
Author: Brian O'Connor <bocon@onlab.us>
Date:   Wed Oct 30 20:49:59 2013 -0700

    adding FlowSynchronizer module

commit 0baf9c1f0d4cf9a0c1fba7cd39fc600440ff7a92
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Thu Oct 31 17:14:13 2013 -0700

    Remove "suspended" member from FlowQueueTable.

commit f9d4a9a57570d9b012365a22485210ed3ce25774
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Thu Oct 31 15:36:01 2013 -0700

    Created FlowQueueTable basic structure.

commit c54f0df74b9d0d79919b57afdd56cdb3153af210
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Thu Oct 31 11:40:32 2013 -0700

    Added comments

commit 8a74ddad4c2b6e983c201f41443f5625fd713114
Author: Naoki Shiota <n-shiota@onlab.us>
Date:   Thu Oct 31 10:55:32 2013 -0700

    Added FlowPusher.java basic implementation

commit f6e58e13d76e127a2ff86f9c7af8ea52115d85db
Merge: 6d7e0c4 71d458c
Author: pavlin-r <pavlin@onlab.us>
Date:   Thu Nov 14 13:51:34 2013 -0800

    Merge pull request #451 from jonohart/master

    Incremental changes to proxy ARP and reactive forwarding

commit 71d458ca5f3887b400adb7be7ed74b9d42e77e04
Author: Jonathan Hart <jono@onlab.us>
Date:   Thu Nov 14 13:40:16 2013 -0800

    Use FlowManager's flow ID service rather thanhave Forwarding generate its own IDs

commit 293b03c4749fa579d931115ff246f561a43c4b4f
Author: Jonathan Hart <jono@onlab.us>
Date:   Thu Nov 14 13:28:17 2013 -0800

    Reverted eclipse auto-indent on these files

commit f05769c7539d3f9d9f941cd4f9c582c3a3760854
Author: Jonathan Hart <jono@onlab.us>
Date:   Thu Nov 14 11:36:06 2013 -0800

    Implemented hashcode() and equals() on Dpid, Port and SwitchPort in order to make SwitchPort comparison easier

commit 41d364178b7042891bac6bcfd6525e312adf5e22
Author: Jonathan Hart <jono@onlab.us>
Date:   Thu Nov 14 11:34:59 2013 -0800

    Improvements to Forwarding to not add flows that already exist

commit a94627b26540c1579f73f485dca3330353de7ebe
Author: Jonathan Hart <jono@onlab.us>
Date:   Thu Nov 14 11:33:46 2013 -0800

    Changes to Device and Switch storage to improve accuracy of device data. For now we will clear the devices when we add or remove ports.

commit c7579becf2f556fb10f8ea25c7607f395ddd1333
Author: Jonathan Hart <jono@onlab.us>
Date:   Tue Nov 12 12:57:29 2013 -0800

    Cleaned up ARP stuff in the HazelcastDatagrid slightly

commit a136fe441a7b76bb312dad0b38900f8dc3dfd663
Merge: 4ca022b 6d7e0c4
Author: Jonathan Hart <jono@onlab.us>
Date:   Tue Nov 12 11:56:48 2013 -0800

    Merge branch 'master' of https://github.com/OPENNETWORKINGLAB/ONOS

commit 4ca022b3023a59794dd3b15310ae5034b1a89a0a
Author: Jonathan Hart <jono@onlab.us>
Date:   Tue Nov 12 11:56:23 2013 -0800

    Minor changes to ARP

commit 0a0ac376619e7c589c7e217bc9981b5148b064b2
Merge: 4b7fd06 5361e9d
Author: Jonathan Hart <jono@onlab.us>
Date:   Mon Nov 11 22:50:00 2013 -0800

    Merge in changes to Forwarding

commit 4b7fd06b4466ccfd8eae7c517f4c5eb4d40ec653
Author: Jonathan Hart <jono@onlab.us>
Date:   Mon Nov 11 22:49:55 2013 -0800

    Began implementing IPC with Hazelcast in order to broadcast ARP via other ONOS nodes

commit 5361e9dbab638d0ff339bc8f09e0052a1425e6dc
Author: Jonathan Hart <jono@onlab.us>
Date:   Thu Nov 7 22:41:32 2013 -0800

    Fixed the use of the FlowManager API to provide rudimentary reactive forwarding
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index d9fb7c3..04e001e 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -5,12 +5,10 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import com.esotericsoftware.kryo2.Kryo;
-import com.esotericsoftware.kryo2.io.Input;
-import com.esotericsoftware.kryo2.io.Output;
+import java.util.concurrent.TimeUnit;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -18,9 +16,9 @@
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.restserver.IRestApiService;
-
 import net.onrc.onos.datagrid.web.DatagridWebRoutable;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -28,9 +26,13 @@
 import net.onrc.onos.ofcontroller.util.FlowPath;
 import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
 
+import org.openflow.util.HexString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo2.io.Input;
+import com.esotericsoftware.kryo2.io.Output;
 import com.hazelcast.config.Config;
 import com.hazelcast.config.FileSystemXmlConfig;
 import com.hazelcast.core.EntryEvent;
@@ -76,6 +78,12 @@
     private IMap<String, byte[]> mapTopology = null;
     private MapTopologyListener mapTopologyListener = null;
     private String mapTopologyListenerId = null;
+    
+    // State related to the ARP map
+    protected static final String arpMapName = "arpMap";
+    private IMap<byte[], byte[]> arpMap = null;
+    private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
+    private final byte[] dummyByte = {0};
 
     /**
      * Class for receiving notifications for Flow state.
@@ -322,6 +330,65 @@
 	    // NOTE: We don't use eviction for this map
 	}
     }
+    
+    /**
+     * Class for receiving notifications for ARP requests.
+     *
+     * The datagrid map is:
+     *  - Key: Request ID (String)
+     *  - Value: ARP request packet (byte[])
+     */
+    class ArpMapListener implements EntryListener<byte[], byte[]> {
+		/**
+		 * Receive a notification that an entry is added.
+		 *
+		 * @param event the notification event for the entry.
+		 */
+		public void entryAdded(EntryEvent<byte[], byte[]> event) {
+		    for (IArpEventHandler arpEventHandler : arpEventHandlers) {
+		    	arpEventHandler.arpRequestNotification(event.getKey());
+		    }
+		    
+		    //
+		    // Decode the value and deliver the notification
+		    //
+		    /*
+		    Kryo kryo = kryoFactory.newKryo();
+		    Input input = new Input(valueBytes);
+		    TopologyElement topologyElement =
+			kryo.readObject(input, TopologyElement.class);
+		    kryoFactory.deleteKryo(kryo);
+		    flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
+		    */
+		}
+	
+		/**
+		 * Receive a notification that an entry is removed.
+		 *
+		 * @param event the notification event for the entry.
+		 */
+		public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+			// Not used
+		}
+	
+		/**
+		 * Receive a notification that an entry is updated.
+		 *
+		 * @param event the notification event for the entry.
+		 */
+		public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+			// Not used
+		}
+	
+		/**
+		 * Receive a notification that an entry is evicted.
+		 *
+		 * @param event the notification event for the entry.
+		 */
+		public void entryEvicted(EntryEvent<byte[], byte[]> event) {
+		    // Not used
+		}
+    }
 
     /**
      * Initialize the Hazelcast Datagrid operation.
@@ -437,6 +504,9 @@
 	hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
 
 	restApi.addRestletRoutable(new DatagridWebRoutable());
+	
+	arpMap = hazelcastInstance.getMap(arpMapName);
+	arpMap.addEntryListener(new ArpMapListener(), true);
     }
 
     /**
@@ -495,7 +565,19 @@
 
 	this.flowEventHandlerService = null;
     }
-
+    
+    @Override
+    public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
+    	if (arpEventHandler != null) {
+    		arpEventHandlers.add(arpEventHandler);
+    	}
+    }
+    
+    @Override
+    public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
+    	arpEventHandlers.remove(arpEventHandler);
+    }
+    
     /**
      * Get all Flows that are currently in the datagrid.
      *
@@ -782,4 +864,10 @@
 	    mapTopology.removeAsync(key);
 	}
     }
+    
+    @Override
+    public void sendArpRequest(byte[] arpRequest) {
+    	log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
+     	arpMap.putAsync(arpRequest, dummyByte, 1L, TimeUnit.MILLISECONDS);
+    }
 }
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 1bcf601..3f40480 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -3,8 +3,8 @@
 import java.util.Collection;
 
 import net.floodlightcontroller.core.module.IFloodlightService;
-
 import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -37,6 +37,20 @@
     void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService);
 
     /**
+     * Register event handler for ARP events.
+     * 
+     * @param arpEventHandler The ARP event handler to register.
+     */
+    public void registerArpEventHandler(IArpEventHandler arpEventHandler);
+    
+    /**
+     * De-register event handler service for ARP events.
+     * 
+     * @param arpEventHandler The ARP event handler to de-register.
+     */
+    public void deregisterArpEventHandler(IArpEventHandler arpEventHandler);
+    
+    /**
      * Get all Flows that are currently in the datagrid.
      *
      * @return all Flows that are currently in the datagrid.
@@ -134,4 +148,10 @@
      * Send a notification that all Topology Elements are removed.
      */
     void notificationSendAllTopologyElementsRemoved();
+    
+    /**
+     * Send an ARP request to other ONOS instances
+     * @param arpRequest The request packet to send
+     */
+    public void sendArpRequest(byte[] arpRequest);
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
index cfc411c..e9e2bd1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
@@ -1,5 +1,6 @@
 package net.onrc.onos.ofcontroller.core.internal;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -16,6 +17,7 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.net.InetAddresses;
 import com.thinkaurelius.titan.core.TitanException;
 
 /**
@@ -23,7 +25,7 @@
  * @author Pankaj
  */
 public class DeviceStorageImpl implements IDeviceStorage {
-	protected final static Logger log = LoggerFactory.getLogger(SwitchStorageImpl.class);
+	protected final static Logger log = LoggerFactory.getLogger(DeviceStorageImpl.class);
 	
 	private GraphDBOperation ope;
 
@@ -110,19 +112,26 @@
 	@Override
 	public void removeDevice(IDevice device) {
 		IDeviceObject dev;
+		
+		if ((dev = ope.searchDevice(device.getMACAddressString())) != null) {
+			removeDevice(dev);
+		}
+	}
+	
+	public void removeDevice(IDeviceObject deviceObject) {
+		String deviceMac = deviceObject.getMACAddress();
+
+		for (IIpv4Address ipv4AddressVertex : deviceObject.getIpv4Addresses()) {
+			ope.removeIpv4Address(ipv4AddressVertex);
+		}
+
 		try {
-			if ((dev = ope.searchDevice(device.getMACAddressString())) != null) {
-				for (IIpv4Address ipv4AddressVertex : dev.getIpv4Addresses()) {
-					ope.removeIpv4Address(ipv4AddressVertex);
-				}
-				
-             	ope.removeDevice(dev);
-             	ope.commit();
-            	log.error("DeviceStorage:removeDevice mac:{} done", device.getMACAddressString());
-            }
+			ope.removeDevice(deviceObject);
+			ope.commit();
+			log.error("DeviceStorage:removeDevice mac:{} done", deviceMac);
 		} catch (TitanException e) {
 			ope.rollback();
-			log.error("DeviceStorage:removeDevice mac:{} failed", device.getMACAddressString());
+			log.error("DeviceStorage:removeDevice mac:{} failed", deviceMac);
 		}
 	}
 
@@ -212,6 +221,14 @@
 		for (IPortObject port: attachedPorts) {
 			log.debug("Detaching the device {}: detaching from port", device.getMACAddressString());
 			port.removeDevice(obj);
+			
+			if (!obj.getAttachedPorts().iterator().hasNext()) {
+				// XXX If there are no more ports attached to the device,
+				// delete it. Otherwise we have a situation where the
+				// device remains forever with an IP address attached.
+				// When we implement device probing we should get rid of this.
+				removeDevice(obj);
+			}
 		}
 	}
 
@@ -239,9 +256,30 @@
 	}
 	
 	private void changeDeviceIpv4Addresses(IDevice device, IDeviceObject deviceObject) {
+		List<String> dbIpv4Addresses = new ArrayList<String>();
+		for (IIpv4Address ipv4Vertex : deviceObject.getIpv4Addresses()) {
+			dbIpv4Addresses.add(InetAddresses.fromInteger(ipv4Vertex.getIpv4Address()).getHostAddress());
+		}
+		
+		List<String> memIpv4Addresses = new ArrayList<String>();
+		for (int addr : device.getIPv4Addresses()) {
+			memIpv4Addresses.add(InetAddresses.fromInteger(addr).getHostAddress());
+		}
+		
+		log.debug("Device IP addresses {}, database IP addresses {}",
+				memIpv4Addresses, dbIpv4Addresses);
+		
 		for (int ipv4Address : device.getIPv4Addresses()) {
 			if (deviceObject.getIpv4Address(ipv4Address) == null) {
 				IIpv4Address dbIpv4Address = ope.ensureIpv4Address(ipv4Address);
+				
+				IDeviceObject oldDevice = dbIpv4Address.getDevice();
+				if (oldDevice != null) {
+					oldDevice.removeIpv4Address(dbIpv4Address);
+				}
+				
+				log.debug("Adding IP address {}", 
+						InetAddresses.fromInteger(ipv4Address).getHostAddress());
 				deviceObject.addIpv4Address(dbIpv4Address);
 			}
 		}
@@ -249,6 +287,9 @@
 		List<Integer> deviceIpv4Addresses = Arrays.asList(device.getIPv4Addresses());
 		for (IIpv4Address dbIpv4Address : deviceObject.getIpv4Addresses()) {
 			if (!deviceIpv4Addresses.contains(dbIpv4Address.getIpv4Address())) {
+				log.debug("Removing IP address {}",
+						InetAddresses.fromInteger(dbIpv4Address.getIpv4Address())
+						.getHostAddress());
 				deviceObject.removeIpv4Address(dbIpv4Address);
 			}
 		}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
index 6377605..59f59b7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
@@ -6,9 +6,10 @@
 import net.floodlightcontroller.core.IOFSwitch;
 import net.onrc.onos.graph.GraphDBConnection;
 import net.onrc.onos.graph.GraphDBOperation;
-import net.onrc.onos.ofcontroller.core.ISwitchStorage;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IDeviceObject;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
+import net.onrc.onos.ofcontroller.core.ISwitchStorage;
 
 import org.openflow.protocol.OFPhysicalPort;
 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
@@ -146,7 +147,8 @@
 			}
 	
 			for (OFPhysicalPort port: sw.getPorts()) {
-				IPortObject p = op.searchPort(dpid, port.getPortNumber());
+				addPort(dpid, port);
+				/*IPortObject p = op.searchPort(dpid, port.getPortNumber());
 				if (p != null) {
 		    		log.debug("SwitchStorage:addPort dpid:{} port:{} exists", dpid, port.getPortNumber());
 		    		setPortStateImpl(p, port.getState(), port.getName());
@@ -158,7 +160,7 @@
 				} else {
 					p = addPortImpl(curr, port.getPortNumber());
 					setPortStateImpl(p, port.getState(), port.getName());
-				}         		
+				} */        		
 			}
 			op.commit();
 			success = true;
@@ -290,6 +292,9 @@
 	public boolean addPort(String dpid, OFPhysicalPort phport) {
 		boolean success = false;
 		
+		DeviceStorageImpl deviceStorage = new DeviceStorageImpl();
+		deviceStorage.init("");
+		
 		if(((OFPortConfig.OFPPC_PORT_DOWN.getValue() & phport.getConfig()) > 0) ||
 				((OFPortState.OFPPS_LINK_DOWN.getValue() & phport.getState()) > 0)) {
 			// just dispatch to deletePort()
@@ -306,6 +311,18 @@
 	        	log.info("SwitchStorage:addPort dpid:{} port:{}", dpid, phport.getPortNumber());
 	        	if (p != null) {
 	        		setPortStateImpl(p, phport.getState(), phport.getName());
+	        		
+	        		if (sw.getPort(phport.getPortNumber()) == null) {
+		    			// The port exists but the switch has no "on" link to it
+		    			sw.addPort(p);
+		    		}
+	        		
+	        		// XXX for now delete devices when we change a port to prevent
+	        		// having stale devices.
+	        		for (IDeviceObject deviceObject : p.getDevices()) {
+	        			deviceStorage.removeDevice(deviceObject);
+	        		}
+	        		
 	        		log.error("SwitchStorage:addPort dpid:{} port:{} exists setting as ACTIVE", dpid, phport.getPortNumber());
 	        	} else {
 	        		addPortImpl(sw, phport.getPortNumber());
@@ -334,6 +351,9 @@
 	public boolean deletePort(String dpid, short port) {
 		boolean success = false;
 		
+		DeviceStorageImpl deviceStorage = new DeviceStorageImpl();
+		deviceStorage.init("");
+		
 		try {
 			ISwitchObject sw = op.searchSwitch(dpid);
 	
@@ -343,10 +363,17 @@
 	        		log.info("SwitchStorage:deletePort dpid:{} port:{} found and set INACTIVE", dpid, port);
 	        		//deletePortImpl(p);
 	        		p.setState("INACTIVE");
+	        		
+	        		// XXX for now delete devices when we change a port to prevent
+	        		// having stale devices.
+	        		for (IDeviceObject d : p.getDevices()) {
+	        			deviceStorage.removeDevice(d);
+	        		}
 	        		op.commit();
 	        	}
 	        }
-		success = true;
+	        
+	        success = true;
 		} catch (Exception e) {
 			op.rollback();
 			e.printStackTrace();
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/module/OnosModuleLoader.java b/src/main/java/net/onrc/onos/ofcontroller/core/module/OnosModuleLoader.java
index f3d0da5..6b8b514 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/module/OnosModuleLoader.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/module/OnosModuleLoader.java
@@ -11,9 +11,9 @@
 import net.floodlightcontroller.core.module.FloodlightModuleException;
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.devicemanager.IDeviceService;
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.floodlightcontroller.topology.ITopologyService;
+import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.ofcontroller.core.config.DefaultConfiguration;
 import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
@@ -24,10 +24,10 @@
 public class OnosModuleLoader implements IFloodlightModule {
 	private IFloodlightProviderService floodlightProvider;
 	private ITopologyService topology;
-	private IDeviceService deviceService;
 	private IConfigInfoService config;
 	private IRestApiService restApi;
 	private IFlowService flowService;
+	private IDatagridService datagrid;
 
 	private ProxyArpManager arpManager;
 	private Forwarding forwarding;
@@ -59,10 +59,9 @@
 				new ArrayList<Class<? extends IFloodlightService>>();
 		dependencies.add(IFloodlightProviderService.class);
 		dependencies.add(ITopologyService.class);
-		dependencies.add(IDeviceService.class);
-		//dependencies.add(IConfigInfoService.class);
 		dependencies.add(IRestApiService.class);
 		dependencies.add(IFlowService.class);
+		dependencies.add(IDatagridService.class);
 		return dependencies;
 	}
 
@@ -71,9 +70,9 @@
 			throws FloodlightModuleException {
 		floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
 		topology = context.getServiceImpl(ITopologyService.class);
-		deviceService = context.getServiceImpl(IDeviceService.class);
 		restApi = context.getServiceImpl(IRestApiService.class);
 		flowService = context.getServiceImpl(IFlowService.class);
+		datagrid = context.getServiceImpl(IDatagridService.class);
 		
 		//This could be null because it's not mandatory to have an
 		//IConfigInfoService loaded.
@@ -82,8 +81,8 @@
 			config = new DefaultConfiguration();
 		}
 
-		arpManager.init(floodlightProvider, topology, deviceService, config, restApi);
-		forwarding.init(floodlightProvider, flowService);
+		arpManager.init(floodlightProvider, topology, datagrid, config, restApi);
+		forwarding.init(floodlightProvider, flowService, datagrid);
 	}
 
 	@Override
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 552d2b5..0927e49 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -126,6 +126,8 @@
 		Iterable<IFlowEntry> allFlowEntries =
 		    dbHandlerInner.getAllSwitchNotUpdatedFlowEntries();
 		for (IFlowEntry flowEntryObj : allFlowEntries) {
+			log.debug("flowEntryobj : {}", flowEntryObj);
+			
 		    counterAllFlowEntries++;
 
 		    String dpidStr = flowEntryObj.getSwitchDpid();
@@ -160,6 +162,8 @@
 		    }
 		    counterMyNotUpdatedFlowEntries++;
 		}
+		
+		log.debug("addFlowEntries : {}", addFlowEntries);
 
 		//
 		// Process the Flow Entries that need to be added
@@ -481,6 +485,7 @@
      *
      * @return the next Flow Entry ID to use.
      */
+    @Override
     public synchronized long getNextFlowEntryId() {
 	//
 	// Generate the next Flow Entry ID.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index 8d362d1..f39acb5 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -120,4 +120,12 @@
      * @return the network topology.
      */
     Topology getTopology();
+    
+    /**
+     * Get a globally unique flow ID from the flow service.
+     * NOTE: Not currently guaranteed to be globally unique.
+     * 
+     * @return unique flow ID
+     */
+    public long getNextFlowEntryId();
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index d6bac5c..686bee0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -8,17 +8,18 @@
 import net.floodlightcontroller.core.IOFSwitch;
 import net.floodlightcontroller.packet.Ethernet;
 import net.floodlightcontroller.util.MACAddress;
+import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.ofcontroller.core.IDeviceStorage;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IDeviceObject;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
 import net.onrc.onos.ofcontroller.core.internal.DeviceStorageImpl;
-import net.onrc.onos.ofcontroller.flowmanager.FlowManager;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
 import net.onrc.onos.ofcontroller.util.CallerId;
 import net.onrc.onos.ofcontroller.util.DataPath;
 import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 import net.onrc.onos.ofcontroller.util.FlowPathType;
@@ -38,6 +39,7 @@
 
 	private IFloodlightProviderService floodlightProvider;
 	private IFlowService flowService;
+	private IDatagridService datagridService;
 	
 	private IDeviceStorage deviceStorage;
 	private TopologyManager topologyService;
@@ -47,9 +49,10 @@
 	}
 	
 	public void init(IFloodlightProviderService floodlightProvider, 
-			IFlowService flowService) {
+			IFlowService flowService, IDatagridService datagridService) {
 		this.floodlightProvider = floodlightProvider;
 		this.flowService = flowService;
+		this.datagridService = datagridService;
 		
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 		
@@ -92,8 +95,9 @@
 		Ethernet eth = IFloodlightProviderService.bcStore.
 				get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
 		
-		// We don't want to handle broadcast traffic
-		if (eth.isBroadcast()) {
+		// We only want to handle unicast IPv4
+		if (eth.isBroadcast() || eth.isMulticast() || 
+				eth.getEtherType() != Ethernet.TYPE_IPv4) {
 			return Command.CONTINUE;
 		}
 		
@@ -127,31 +131,67 @@
 				new Dpid(sw.getId()), new Port(pi.getInPort())); 
 		SwitchPort dstSwitchPort = new SwitchPort(
 				new Dpid(destinationDpid), new Port(destinationPort)); 
-		DataPath shortestPath = 
-				topologyService.getDatabaseShortestPath(srcSwitchPort, dstSwitchPort);
-		
-		if (shortestPath == null) {
-			log.debug("Shortest path not found between {} and {}", 
-					srcSwitchPort, dstSwitchPort);
-			return;
-		}
-		
+				
 		MACAddress srcMacAddress = MACAddress.valueOf(eth.getSourceMACAddress());
 		MACAddress dstMacAddress = MACAddress.valueOf(eth.getDestinationMACAddress());
 		
-		FlowId flowId = new FlowId(1L); //dummy flow ID
+		if (flowExists(srcSwitchPort, srcMacAddress, 
+				dstSwitchPort, dstMacAddress)) {
+			log.debug("Not adding flow because it already exists");
+			
+			// Don't do anything if the flow already exists
+			return;
+		}
+		
+		log.debug("Adding new flow between {} at {} and {} at {}",
+				new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
+		
+		
+		DataPath dataPath = new DataPath();
+		dataPath.setSrcPort(srcSwitchPort);
+		dataPath.setDstPort(dstSwitchPort);
+		
+		FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
 		FlowPath flowPath = new FlowPath();
 		flowPath.setFlowId(flowId);
 		flowPath.setInstallerId(new CallerId("Forwarding"));
 		flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
 		flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
+		flowPath.setFlowEntryMatch(new FlowEntryMatch());
 		flowPath.flowEntryMatch().enableSrcMac(srcMacAddress);
 		flowPath.flowEntryMatch().enableDstMac(dstMacAddress);
 		// For now just forward IPv4 packets. This prevents accidentally
 		// other stuff like ARP.
 		flowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
+		flowPath.setDataPath(dataPath);
+		
 		flowService.addFlow(flowPath, flowId);
-		//flowService.addAndMaintainShortestPathFlow(shortestPath.)
+	}
+	
+	private boolean flowExists(SwitchPort srcPort, MACAddress srcMac, 
+			SwitchPort dstPort, MACAddress dstMac) {
+		for (FlowPath flow : datagridService.getAllFlows()) {
+			FlowEntryMatch match = flow.flowEntryMatch();
+			// TODO implement FlowEntryMatch.equals();
+			// This is painful to do properly without support in the FlowEntryMatch
+			boolean same = true;
+			if (!match.srcMac().equals(srcMac) ||
+				!match.dstMac().equals(dstMac)) {
+				same = false;
+			}
+			if (!flow.dataPath().srcPort().equals(srcPort) || 
+				!flow.dataPath().dstPort().equals(dstPort)) {
+				same = false;
+			}
+			
+			if (same) {
+				log.debug("found flow entry that's the same {}-{}:::{}-{}",
+						new Object[] {srcPort, srcMac, dstPort, dstMac});
+				return true;
+			}
+		}
+		
+		return false;
 	}
 
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
new file mode 100644
index 0000000..51d92bd
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
@@ -0,0 +1,11 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+public interface IArpEventHandler {
+
+	/**
+	 * Notify the ARP event handler that an ARP request has been received.
+	 * @param id The string ID of the ARP request
+	 * @param arpRequest The ARP request packet
+	 */
+	public void arpRequestNotification(byte[] arpRequest);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index a5dabc9..426a455 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -17,16 +17,18 @@
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.IOFMessageListener;
 import net.floodlightcontroller.core.IOFSwitch;
-import net.floodlightcontroller.devicemanager.IDeviceService;
 import net.floodlightcontroller.packet.ARP;
 import net.floodlightcontroller.packet.Ethernet;
 import net.floodlightcontroller.packet.IPv4;
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.floodlightcontroller.topology.ITopologyService;
 import net.floodlightcontroller.util.MACAddress;
+import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.ofcontroller.bgproute.Interface;
 import net.onrc.onos.ofcontroller.core.IDeviceStorage;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IDeviceObject;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoLinkService;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoSwitchService;
 import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
 import net.onrc.onos.ofcontroller.core.internal.DeviceStorageImpl;
 
@@ -46,7 +48,8 @@
 import com.google.common.collect.SetMultimap;
 import com.google.common.net.InetAddresses;
 
-public class ProxyArpManager implements IProxyArpService, IOFMessageListener {
+public class ProxyArpManager implements IProxyArpService, IOFMessageListener,
+										IArpEventHandler {
 	private final static Logger log = LoggerFactory.getLogger(ProxyArpManager.class);
 	
 	private final long ARP_TIMER_PERIOD = 60000; //ms (== 1 min) 
@@ -55,11 +58,13 @@
 			
 	private IFloodlightProviderService floodlightProvider;
 	private ITopologyService topology;
-	private IDeviceService deviceService;
+	private IDatagridService datagrid;
 	private IConfigInfoService configService;
 	private IRestApiService restApi;
 	
 	private IDeviceStorage deviceStorage;
+	private ITopoSwitchService topoSwitchService;
+	private ITopoLinkService topoLinkService;
 	
 	private short vlan;
 	private static final short NO_VLAN = 0;
@@ -124,11 +129,11 @@
 	*/
 	
 	public void init(IFloodlightProviderService floodlightProvider,
-			ITopologyService topology, IDeviceService deviceService,
+			ITopologyService topology, IDatagridService datagrid,
 			IConfigInfoService config, IRestApiService restApi){
 		this.floodlightProvider = floodlightProvider;
 		this.topology = topology;
-		this.deviceService = deviceService;
+		this.datagrid = datagrid;
 		this.configService = config;
 		this.restApi = restApi;
 		
@@ -145,6 +150,8 @@
 		restApi.addRestletRoutable(new ArpWebRoutable());
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 		
+		datagrid.registerArpEventHandler(this);
+		
 		deviceStorage = new DeviceStorageImpl();
 		deviceStorage.init("");
 		
@@ -232,9 +239,10 @@
 	public Command receive(
 			IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
 		
-		if (msg.getType() != OFType.PACKET_IN){
-			return Command.CONTINUE;
-		}
+		//if (msg.getType() != OFType.PACKET_IN){
+			//return Command.CONTINUE;
+		//}
+		log.debug("received packet");
 		
 		OFPacketIn pi = (OFPacketIn) msg;
 		
@@ -242,16 +250,18 @@
                 IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
 		
 		if (eth.getEtherType() == Ethernet.TYPE_ARP){
-			ARP arp = (ARP) eth.getPayload();
-			
+			log.debug("is arp");
+			ARP arp = (ARP) eth.getPayload();	
 			if (arp.getOpCode() == ARP.OP_REQUEST) {
+				log.debug("is request");
 				//TODO check what the DeviceManager does about propagating
 				//or swallowing ARPs. We want to go after DeviceManager in the
 				//chain but we really need it to CONTINUE ARP packets so we can
 				//get them.
-				handleArpRequest(sw, pi, arp);
+				handleArpRequest(sw, pi, arp, eth);
 			}
 			else if (arp.getOpCode() == ARP.OP_REPLY) {
+				log.debug("is reply");
 				//handleArpReply(sw, pi, arp);
 			}
 		}
@@ -261,7 +271,7 @@
 		return Command.CONTINUE;
 	}
 	
-	private void handleArpRequest(IOFSwitch sw, OFPacketIn pi, ARP arp) {
+	private void handleArpRequest(IOFSwitch sw, OFPacketIn pi, ARP arp, Ethernet eth) {
 		if (log.isTraceEnabled()) {
 			log.trace("ARP request received for {}", 
 					inetAddressToString(arp.getTargetProtocolAddress()));
@@ -291,19 +301,13 @@
 		
 		//MACAddress macAddress = arpCache.lookup(target);
 		
-		//IDevice dstDevice = deviceService.fcStore.get(cntx, IDeviceService.CONTEXT_DST_DEVICE);
-		//Iterator<? extends IDevice> it = deviceService.queryDevices(
-				//null, null, InetAddresses.coerceToInteger(target), null, null);
-		
-		//IDevice targetDevice = null;
-		//if (it.hasNext()) {
-			//targetDevice = it.next();
-		//}
 		IDeviceObject targetDevice = 
 				deviceStorage.getDeviceByIP(InetAddresses.coerceToInteger(target));
 		
+		log.debug("targetDevice: {}", targetDevice);
+		
 		if (targetDevice != null) {
-			//We have the device in our database, so send a reply
+			// We have the device in our database, so send a reply
 			MACAddress macAddress = MACAddress.valueOf(targetDevice.getMACAddress());
 			
 			if (log.isTraceEnabled()) {
@@ -315,6 +319,16 @@
 			
 			sendArpReply(arp, sw.getId(), pi.getInPort(), macAddress);
 		}
+		else {
+			// We don't know the device so broadcast the request out
+			// the edge of the network
+			
+			//Record where the request came from so we know where to send the reply
+			arpRequests.put(target, new ArpRequest(
+					new HostArpRequester(arp, sw.getId(), pi.getInPort()), false));
+			
+			sendToOtherNodes(eth, pi);
+		}
 		
 		/*if (macAddress == null){
 			//MAC address is not in our ARP cache.
@@ -459,6 +473,15 @@
 		}
 	}
 	
+	private void sendToOtherNodes(Ethernet eth, OFPacketIn pi) {
+		ARP arp = (ARP) eth.getPayload();
+		if (log.isTraceEnabled()) {
+			log.trace("Sending ARP request for {} to other ONOS instances",
+					HexString.toHexString(arp.getTargetProtocolAddress()));
+		}
+		datagrid.sendArpRequest(eth.serialize());
+	}
+	
 	private void broadcastArpRequestOutEdge(byte[] arpRequest, long inSwitch, short inPort) {
 		for (IOFSwitch sw : floodlightProvider.getSwitches().values()){
 			Collection<Short> enabledPorts = sw.getEnabledPortNumbers();
@@ -637,4 +660,14 @@
 	public List<String> getMappings() {
 		return arpCache.getMappings();
 	}
+
+	/*
+	 * IArpEventHandler methods
+	 */
+	
+	@Override
+	public void arpRequestNotification(byte[] arpRequest) {
+		log.debug("Received ARP notification from other instances");
+		broadcastArpRequestOutEdge(arpRequest, Long.MAX_VALUE, Short.MAX_VALUE);
+	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/Dpid.java b/src/main/java/net/onrc/onos/ofcontroller/util/Dpid.java
index c3cf3aa..bd91daa 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/Dpid.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/Dpid.java
@@ -67,4 +67,22 @@
     public String toString() {
 	return HexString.toHexString(this.value);
     }
+
+    @Override
+    public boolean equals(Object other) {
+    	if (!(other instanceof Dpid)) {
+    		return false;
+    	}
+
+    	Dpid otherDpid = (Dpid) other;
+
+    	return value == otherDpid.value;
+    }
+
+    @Override
+    public int hashCode() {
+    	int hash = 17;
+    	hash += 31 * hash + (int)(value ^ value >>> 32); 
+    	return hash;
+    }
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/Port.java b/src/main/java/net/onrc/onos/ofcontroller/util/Port.java
index bb4851c..fedc8df 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/Port.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/Port.java
@@ -131,4 +131,23 @@
     public String toString() {
 	return Short.toString(this.value);
     }
+    
+
+    @Override
+    public boolean equals(Object other) {
+    	if (!(other instanceof Port)) {
+    		return false;
+    	}
+
+    	Port otherPort = (Port) other;
+
+    	return value == otherPort.value;
+    }
+
+    @Override
+    public int hashCode() {
+    	int hash = 17;
+    	hash += 31 * hash + (int)value; 
+    	return hash;
+    }
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/SwitchPort.java b/src/main/java/net/onrc/onos/ofcontroller/util/SwitchPort.java
index 95a934f..8912803 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/SwitchPort.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/SwitchPort.java
@@ -85,4 +85,25 @@
     public String toString() {
 	return this.dpid.toString() + "/" + this.port;
     }
+    
+
+    @Override
+    public boolean equals(Object other) {
+    	if (!(other instanceof SwitchPort)) {
+    		return false;
+    	}
+
+    	SwitchPort otherSwitchPort = (SwitchPort) other;
+
+    	return (dpid.equals(otherSwitchPort.dpid) &&
+    			port.equals(otherSwitchPort.port));
+    }
+
+    @Override
+    public int hashCode() {
+    	int hash = 17;
+    	hash += 31 * hash + dpid.hashCode();
+    	hash += 31 * hash + port.hashCode();
+    	return hash;
+    }
 }