Merge pull request #517 from y-higuchi/fix_tcpkill_detection

Fix tcpkill detection method
diff --git a/cluster-mgmt/template/onsdemo_edge_template.py b/cluster-mgmt/template/onsdemo_edge_template.py
index c3d0287..b5a76f5 100755
--- a/cluster-mgmt/template/onsdemo_edge_template.py
+++ b/cluster-mgmt/template/onsdemo_edge_template.py
@@ -113,7 +113,7 @@
     #  controllers.append(rc)
 
     #net.controllers=controllers
-    net.build()
+    #net.build()
 
     host = []
     for i in range (NR_NODES):
diff --git a/kryo2/.gitignore b/kryo2/.gitignore
deleted file mode 100644
index 916e17c..0000000
--- a/kryo2/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-dependency-reduced-pom.xml
diff --git a/kryo2/pom.xml b/kryo2/pom.xml
deleted file mode 100644
index 788f952..0000000
--- a/kryo2/pom.xml
+++ /dev/null
@@ -1,68 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <groupId>net.onrc.onos</groupId>
-  <artifactId>kryo2</artifactId>
-  <version>2.22</version>
-  <packaging>jar</packaging>
-
-  <name>kryo2</name>
-  <url>http://maven.apache.org</url>
-
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>com.esotericsoftware.kryo</groupId>
-      <artifactId>kryo</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-  </dependencies>
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <version>2.1</version>
-        <configuration>
-              <relocations>
-                <relocation>
-                  <pattern>com.esotericsoftware.kryo</pattern>
-                  <shadedPattern>com.esotericsoftware.kryo2</shadedPattern>
-                  <excludes>
-                  </excludes>
-                </relocation>
-              </relocations>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>exec-maven-plugin</artifactId>
-        <version>1.2.1</version>
-        <executions>
-          <execution>
-            <id>kryo2</id>
-            <goals>
-              <goal>exec</goal>
-            </goals>
-          </execution>
-        </executions>
-            <configuration>
-              <executable>mvn</executable>
-              <commandlineArgs>install:install-file -DlocalRepositoryPath=${basedir}/../repo -DcreateChecksum=true -Dpackaging=jar -Dfile=${basedir}/target/${project.build.finalName}.jar -DgroupId=${project.groupId} -DartifactId=${project.artifactId} -Dversion=${project.version}</commandlineArgs>
-            </configuration>
-      </plugin>
-    </plugins>
-  </build>
-</project>
diff --git a/pom.xml b/pom.xml
index 39e9f30..6a1b29d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -265,22 +265,11 @@
     </plugins>
   </reporting>
   <dependencies>
-    <!-- Commenting out original kryo 2.X
-         and using shaded version (net.onrc.onos.kryo2)
-         to workaround conflict with kryo 1.X in titan's dependency.(#443)
-    -->
-    <!--
     <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>
       <artifactId>kryo</artifactId>
       <version>2.22</version>
     </dependency>
-    -->
-    <dependency>
-      <groupId>net.onrc.onos</groupId>
-      <artifactId>kryo2</artifactId>
-      <version>2.22</version>
-    </dependency>
     <!-- ONOS's direct dependencies -->
     <dependency>
       <groupId>org.apache.cassandra</groupId>
@@ -291,7 +280,7 @@
     <dependency>
       <groupId>com.thinkaurelius.titan</groupId>
       <artifactId>titan-all</artifactId>
-      <version>0.2.1</version>
+      <version>0.4.2</version>
       <exclusions>
 	<exclusion>
           <groupId>org.slf4j</groupId>
@@ -302,12 +291,12 @@
     <dependency>
       <groupId>com.tinkerpop</groupId>
       <artifactId>frames</artifactId>
-      <version>2.3.1</version>
+      <version>2.4.0</version>
     </dependency>
     <dependency>
       <groupId>com.tinkerpop.blueprints</groupId>
       <artifactId>blueprints-core</artifactId>
-      <version>2.3.0</version>
+      <version>2.4.0</version>
     </dependency>
     <dependency>
       <groupId>com.hazelcast</groupId>
diff --git a/rebuild-local-repo.sh b/rebuild-local-repo.sh
index 4ea1fbf..1baa27b 100755
--- a/rebuild-local-repo.sh
+++ b/rebuild-local-repo.sh
@@ -15,11 +15,6 @@
     MVN="mvn"
 fi
 
-# Install Kryo2 workaround to local repo
-# - Shaded(rename package name to allow mixing 2 different Kryo version)
-# - Install created sharded jar to local repo
-${MVN} -f kryo2/pom.xml package exec:exec
-
 # Install modified curators to local repo
 ${MVN} install:install-file -Dfile=./curator/curator-framework-1.3.5-SNAPSHOT.jar -DgroupId=com.netflix.curator -DartifactId=curator-framework -Dversion=1.3.5-SNAPSHOT -Dpackaging=jar -DgeneratePom=true -DlocalRepositoryPath=./repo -DcreateChecksum=true
 ${MVN} install:install-file -Dfile=./curator/curator-client-1.3.5-SNAPSHOT.jar -DgroupId=com.netflix.curator -DartifactId=curator-client -Dversion=1.3.5-SNAPSHOT -Dpackaging=jar -DgeneratePom=true -DlocalRepositoryPath=./repo -DcreateChecksum=true
diff --git a/scripts/all-linkup.sh b/scripts/all-linkup.sh
index 9067012..290a17d 100755
--- a/scripts/all-linkup.sh
+++ b/scripts/all-linkup.sh
@@ -3,11 +3,6 @@
 controller=`hostname`
 switches=`sudo ovs-vsctl list-br`
 
-function host2ip (){
-   ip=`grep $1 /etc/hosts |grep -v "ip6"|  awk '{print $1}'`
-   echo $ip
-}
-
 for s in $switches; do
   ports=`sudo ovs-vsctl --pretty list-ports $s`
   for p in $ports; do
diff --git a/scripts/ctrl-local.sh b/scripts/ctrl-local.sh
index 023a9db..1418a2c 100755
--- a/scripts/ctrl-local.sh
+++ b/scripts/ctrl-local.sh
@@ -1,9 +1,8 @@
 #! /bin/bash
 controller=`hostname`
-switches=`ifconfig -a | grep sw |grep -v eth | awk '{print $1}'`
-
+switches=`sudo ovs-vsctl list-br`
 function host2ip (){
-   ip=`grep $1 /etc/hosts |grep -v "ip6"|  awk '{print $1}'`
+   ip=`getent hosts $1 |  awk '{print $1}' | tail -n 1`
    echo $ip
 }
 
diff --git a/scripts/ctrl-none.sh b/scripts/ctrl-none.sh
index 74349e3..fb8d2a0 100755
--- a/scripts/ctrl-none.sh
+++ b/scripts/ctrl-none.sh
@@ -1,9 +1,8 @@
 #! /bin/bash
 controller=""
-switches=`ifconfig -a | grep sw |grep -v eth | awk '{print $1}'`
-
+switches=`sudo ovs-vsctl list-br`
 function host2ip (){
-   ip=`grep $1 /etc/hosts |grep -v "ip6"|  awk '{print $1}'`
+   ip=`getent hosts $1 |  awk '{print $1}' | tail -n 1`
    echo $ip
 }
 
diff --git a/scripts/ctrl-one.sh b/scripts/ctrl-one.sh
index 207d3f2..9fe8341 100755
--- a/scripts/ctrl-one.sh
+++ b/scripts/ctrl-one.sh
@@ -8,10 +8,9 @@
 
 #controller=`hostname`
 controller=$1
-switches=`ifconfig -a | grep sw |grep -v eth | awk '{print $1}'`
-
+switches=`sudo ovs-vsctl list-br`
 function host2ip (){
-   ip=`grep $1 /etc/hosts |grep -v "ip6"|  awk '{print $1}'`
+   ip=`getent hosts $1 |  awk '{print $1}' | tail -n 1`
    echo $ip
 }
 
diff --git a/scripts/link.sh b/scripts/link.sh
index dc202e7..57323ef 100755
--- a/scripts/link.sh
+++ b/scripts/link.sh
@@ -1,7 +1,7 @@
 #! /bin/bash
 
 controller=`hostname`
-switches=`ifconfig -a | grep sw |grep -v eth | awk '{print $1}'`
+switches=`sudo ovs-vsctl list-br`
 
 function host2ip (){
    ip=`grep $1 /etc/hosts |grep -v "ip6"|  awk '{print $1}'`
@@ -12,14 +12,16 @@
 
 if [ $# != 3 ];then
  echo "usage: $0 <dpid> <port> <up|down>"
+ echo " example: $0 00:00:00:00:ba:5e:ba:11 1 up"
+ exit
 fi
 
-src_dpid="dpid:"`echo $1 | sed s'/://g'`
+src_dpid=`echo $1 | sed s'/://g'`
 src_port=$2
 cmd=$3
 
 for s in $switches; do
-    dpid=`sudo ovs-ofctl  show  $s |grep dpid | awk '{print $4}'`
+    dpid=`sudo ovs-ofctl show  $s |grep dpid | awk '{if(match($0,/dpid:[0-9|a-d]*/)){ print substr($0,RSTART+5,RLENGTH)}}'`
     if [  "x$dpid" == "x$src_dpid" ]; then
 
 #       intf=`sudo ovs-ofctl show $s |grep addr | awk -v p=$src_port 'BEGIN {pat="^ "p"\("}
diff --git a/scripts/showdpid.sh b/scripts/showdpid.sh
index 1dff291..14b6345 100755
--- a/scripts/showdpid.sh
+++ b/scripts/showdpid.sh
@@ -2,17 +2,6 @@
 controller=""
 #switches=`ifconfig -a | grep sw |grep -v eth | awk '{print $1}'`
 switches=`sudo ovs-vsctl list-br`
-
-function host2ip (){
-   ip=`grep $1 /etc/hosts |grep -v "ip6"|  awk '{print $1}'`
-   echo $ip
-}
-
-url=""
-for c in $controller; do
-  url="$url tcp:`host2ip $c`:6633"
-done
-echo $url
 for s in $switches; do
     echo -n "$s : "
     sudo ovs-ofctl  show  $s |grep dpid
diff --git a/scripts/showflow.sh b/scripts/showflow.sh
index 15824d7..63b82b7 100755
--- a/scripts/showflow.sh
+++ b/scripts/showflow.sh
@@ -2,11 +2,6 @@
 controller=""
 switches=`sudo ovs-vsctl list-br`
 
-function host2ip (){
-   ip=`grep $1 /etc/hosts |grep -v "ip6"|  awk '{print $1}'`
-   echo $ip
-}
-
 dpids=()
 for s in $switches; do
     i=`sudo ovs-ofctl  show  $s |grep dpid | awk -F ":" '{print $4}'`
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 6483121..5be4191 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -18,8 +18,10 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.onrc.onos.datagrid.web.DatagridWebRoutable;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
-import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
-import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
+import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -30,9 +32,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esotericsoftware.kryo2.Kryo;
-import com.esotericsoftware.kryo2.io.Input;
-import com.esotericsoftware.kryo2.io.Output;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.hazelcast.config.Config;
 import com.hazelcast.config.FileSystemXmlConfig;
 import com.hazelcast.core.EntryEvent;
@@ -78,13 +80,19 @@
     private IMap<String, byte[]> mapTopology = null;
     private MapTopologyListener mapTopologyListener = null;
     private String mapTopologyListenerId = null;
-    
-    // State related to the ARP map
-    protected static final String arpMapName = "arpMap";
-    private IMap<ArpMessage, byte[]> arpMap = null;
-    private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
+
+    // State related to the packet out map
+    protected static final String packetOutMapName = "packetOutMap";
+    private IMap<PacketOutNotification, byte[]> packetOutMap = null;
+    private List<IPacketOutEventHandler> packetOutEventHandlers = new ArrayList<IPacketOutEventHandler>();
+
     private final byte[] dummyByte = {0};
 
+    // State related to the ARP reply map
+    protected static final String arpReplyMapName = "arpReplyMap";
+    private IMap<ArpReplyNotification, byte[]> arpReplyMap = null;
+    private List<IArpReplyEventHandler> arpReplyEventHandlers = new ArrayList<IArpReplyEventHandler>();
+
     /**
      * Class for receiving notifications for Flow state.
      *
@@ -98,8 +106,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryAdded(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -116,8 +125,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryRemoved(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -134,8 +144,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryUpdated(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -152,6 +163,7 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryEvicted(EntryEvent<Long, byte[]> event) {
 	    // NOTE: We don't use eviction for this map
 	}
@@ -170,8 +182,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryAdded(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -188,8 +201,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryRemoved(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -206,8 +220,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryUpdated(EntryEvent<Long, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -224,6 +239,7 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryEvicted(EntryEvent<Long, byte[]> event) {
 	    // NOTE: We don't use eviction for this map
 	}
@@ -242,8 +258,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryAdded(EntryEvent<String, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -261,8 +278,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryRemoved(EntryEvent<String, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -280,8 +298,9 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryUpdated(EntryEvent<String, byte[]> event) {
-	    byte[] valueBytes = (byte[])event.getValue();
+	    byte[] valueBytes = event.getValue();
 
 	    //
 	    // Decode the value and deliver the notification
@@ -299,71 +318,93 @@
 	 *
 	 * @param event the notification event for the entry.
 	 */
+	@Override
 	public void entryEvicted(EntryEvent<String, byte[]> event) {
 	    // NOTE: We don't use eviction for this map
 	}
     }
-    
+
     /**
-     * Class for receiving notifications for ARP requests.
+     * Class for receiving notifications for sending packet-outs.
      *
      * The datagrid map is:
-     *  - Key: Request ID (String)
-     *  - Value: ARP request packet (byte[])
+     *  - Key: Packet-out to send (PacketOutNotification)
+     *  - Value: dummy value (we only need the key) (byte[])
      */
-    class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
+    class PacketOutMapListener implements EntryListener<PacketOutNotification, byte[]> {
 		/**
 		 * Receive a notification that an entry is added.
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
-		    for (IArpEventHandler arpEventHandler : arpEventHandlers) {
-		    	arpEventHandler.arpRequestNotification(event.getKey());
+		@Override
+		public void entryAdded(EntryEvent<PacketOutNotification, byte[]> event) {
+		    for (IPacketOutEventHandler packetOutEventHandler : packetOutEventHandlers) {
+		    	packetOutEventHandler.packetOutNotification(event.getKey());
 		    }
-		    
-		    //
-		    // Decode the value and deliver the notification
-		    //
-		    /*
-		    Kryo kryo = kryoFactory.newKryo();
-		    Input input = new Input(valueBytes);
-		    TopologyElement topologyElement =
-			kryo.readObject(input, TopologyElement.class);
-		    kryoFactory.deleteKryo(kryo);
-		    flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
-		    */
 		}
-	
+
 		/**
 		 * Receive a notification that an entry is removed.
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
+		@Override
+		public void entryRemoved(EntryEvent<PacketOutNotification, byte[]> event) {
 			// Not used
 		}
-	
+
 		/**
 		 * Receive a notification that an entry is updated.
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
+		@Override
+		public void entryUpdated(EntryEvent<PacketOutNotification, byte[]> event) {
 			// Not used
 		}
-	
+
 		/**
 		 * Receive a notification that an entry is evicted.
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
+		@Override
+		public void entryEvicted(EntryEvent<PacketOutNotification, byte[]> event) {
 		    // Not used
 		}
     }
 
     /**
+     * Class for receiving notifications for sending packet-outs.
+     *
+     * The datagrid map is:
+     *  - Key: Packet-out to send (PacketOutNotification)
+     *  - Value: dummy value (we only need the key) (byte[])
+     */
+    class ArpReplyMapListener implements EntryListener<ArpReplyNotification, byte[]> {
+		/**
+		 * Receive a notification that an entry is added.
+		 *
+		 * @param event the notification event for the entry.
+		 */
+		@Override
+		public void entryAdded(EntryEvent<ArpReplyNotification, byte[]> event) {
+		    for (IArpReplyEventHandler arpReplyEventHandler : arpReplyEventHandlers) {
+		    	arpReplyEventHandler.arpReplyEvent(event.getKey());
+		    }
+		}
+
+		// These methods aren't used for ARP replies
+		@Override
+		public void entryRemoved(EntryEvent<ArpReplyNotification, byte[]> event) {}
+		@Override
+		public void entryUpdated(EntryEvent<ArpReplyNotification, byte[]> event) {}
+		@Override
+		public void entryEvicted(EntryEvent<ArpReplyNotification, byte[]> event) {}
+    }
+
+    /**
      * Initialize the Hazelcast Datagrid operation.
      *
      * @param conf the configuration filename.
@@ -374,7 +415,7 @@
 	System.setProperty("hazelcast.socket.send.buffer.size", "32");
 	*/
 	// System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
-	
+
 	// Init from configuration file
 	try {
 	    hazelcastConfig = new FileSystemXmlConfig(configFilename);
@@ -395,7 +436,8 @@
     /**
      * Shutdown the Hazelcast Datagrid operation.
      */
-    public void finalize() {
+    @Override
+    protected void finalize() {
 	close();
     }
 
@@ -413,7 +455,7 @@
      */
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
             new ArrayList<Class<? extends IFloodlightService>>();
         l.add(IDatagridService.class);
         return l;
@@ -425,10 +467,10 @@
      * @return the collection of implemented services.
      */
     @Override
-    public Map<Class<? extends IFloodlightService>, IFloodlightService> 
+    public Map<Class<? extends IFloodlightService>, IFloodlightService>
 			       getServiceImpls() {
         Map<Class<? extends IFloodlightService>,
-	    IFloodlightService> m = 
+	    IFloodlightService> m =
             new HashMap<Class<? extends IFloodlightService>,
                 IFloodlightService>();
         m.put(IDatagridService.class, this);
@@ -441,7 +483,7 @@
      * @return the collection of modules this module depends on.
      */
     @Override
-    public Collection<Class<? extends IFloodlightService>> 
+    public Collection<Class<? extends IFloodlightService>>
                                                     getModuleDependencies() {
 	Collection<Class<? extends IFloodlightService>> l =
 	    new ArrayList<Class<? extends IFloodlightService>>();
@@ -477,9 +519,12 @@
 	hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
 
 	restApi.addRestletRoutable(new DatagridWebRoutable());
-	
-	arpMap = hazelcastInstance.getMap(arpMapName);
-	arpMap.addEntryListener(new ArpMapListener(), true);
+
+	packetOutMap = hazelcastInstance.getMap(packetOutMapName);
+	packetOutMap.addEntryListener(new PacketOutMapListener(), true);
+
+	arpReplyMap = hazelcastInstance.getMap(arpReplyMapName);
+	arpReplyMap.addEntryListener(new ArpReplyMapListener(), true);
     }
 
     /**
@@ -538,19 +583,31 @@
 
 	this.flowEventHandlerService = null;
     }
-    
+
     @Override
-    public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
-    	if (arpEventHandler != null) {
-    		arpEventHandlers.add(arpEventHandler);
+    public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
+    	if (packetOutEventHandler != null) {
+    		packetOutEventHandlers.add(packetOutEventHandler);
     	}
     }
-    
+
     @Override
-    public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
-    	arpEventHandlers.remove(arpEventHandler);
+    public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler) {
+    	packetOutEventHandlers.remove(packetOutEventHandler);
     }
-    
+
+    @Override
+    public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
+    	if (arpReplyEventHandler != null) {
+    		arpReplyEventHandlers.add(arpReplyEventHandler);
+    	}
+    }
+
+    @Override
+    public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler) {
+    	arpReplyEventHandlers.remove(arpReplyEventHandler);
+    }
+
     /**
      * Get all Flows that are currently in the datagrid.
      *
@@ -883,10 +940,14 @@
 	    mapTopology.removeAsync(key);
 	}
     }
-    
+
     @Override
-    public void sendArpRequest(ArpMessage arpMessage) {
-    	//log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
-     	arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
+    public void sendPacketOutNotification(PacketOutNotification packetOutNotification) {
+     	packetOutMap.putAsync(packetOutNotification, dummyByte, 1L, TimeUnit.MILLISECONDS);
     }
+
+	@Override
+	public void sendArpReplyNotification(ArpReplyNotification arpReply) {
+		arpReplyMap.putAsync(arpReply, dummyByte, 1L, TimeUnit.MILLISECONDS);
+	}
 }
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 0f03d77..1478129 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -4,8 +4,10 @@
 
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
-import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
-import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
+import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
+import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -38,18 +40,32 @@
     void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService);
 
     /**
-     * Register event handler for ARP events.
+     * Register event handler for packet-out events.
      * 
-     * @param arpEventHandler The ARP event handler to register.
+     * @param packetOutEventHandler The packet-out event handler to register.
      */
-    public void registerArpEventHandler(IArpEventHandler arpEventHandler);
+    public void registerPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler);
     
     /**
-     * De-register event handler service for ARP events.
+     * Deregister event handler service for packet-out events.
      * 
-     * @param arpEventHandler The ARP event handler to de-register.
+     * @param packetOutEventHandler The packet-out event handler to deregister.
      */
-    public void deregisterArpEventHandler(IArpEventHandler arpEventHandler);
+    public void deregisterPacketOutEventHandler(IPacketOutEventHandler packetOutEventHandler);
+    
+    /**
+     * Register event handler for ARP reply events.
+     * 
+     * @param packetOutEventHandler The ARP reply event handler to register.
+     */
+    public void registerArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler);
+    
+    /**
+     * Deregister event handler service for ARP reply events.
+     * 
+     * @param packetOutEventHandler The ARP reply event handler to deregister.
+     */
+    public void deregisterArpReplyEventHandler(IArpReplyEventHandler arpReplyEventHandler);
 
     /**
      * Get all Flows that are currently in the datagrid.
@@ -167,8 +183,21 @@
     void notificationSendAllTopologyElementsRemoved();
     
     /**
-     * Send an ARP request to other ONOS instances
-     * @param arpRequest The request packet to send
+     * Send a packet-out notification to other ONOS instances. This informs
+     * other instances that they should send this packet out some of the ports
+     * they control. Not all notifications are applicable to all instances 
+     * (i.e. some notifications specify a single port to send the packet out),
+     * so each instance must determine whether it needs to take action when it
+     * receives the notification.
+     * 
+     * @param packetOutNotification The packet notification to send
      */
-    public void sendArpRequest(ArpMessage arpMessage);  
+    public void sendPacketOutNotification(PacketOutNotification packetOutNotification);
+    
+    /**
+     * Send notification to other ONOS instances that an ARP reply has been 
+     * received.
+     * @param arpReply The notification of the ARP reply
+     */
+    public void sendArpReplyNotification(ArpReplyNotification arpReply);
 }
diff --git a/src/main/java/net/onrc/onos/datagrid/web/GetMapResource.java b/src/main/java/net/onrc/onos/datagrid/web/GetMapResource.java
index 8c8e1cd..6a56b2f 100644
--- a/src/main/java/net/onrc/onos/datagrid/web/GetMapResource.java
+++ b/src/main/java/net/onrc/onos/datagrid/web/GetMapResource.java
@@ -47,7 +47,7 @@
 	// Extract the arguments
 	String mapNameStr = (String)getRequestAttributes().get("map-name");
 
-	log.debug("Get Datagrid Map: " + mapNameStr);
+	log.debug("Get Datagrid Map: {}", mapNameStr);
 
 	//
 	// Get the Flows
diff --git a/src/main/java/net/onrc/onos/graph/GraphDBConnection.java b/src/main/java/net/onrc/onos/graph/GraphDBConnection.java
index 7938bb6..b504c4b 100644
--- a/src/main/java/net/onrc/onos/graph/GraphDBConnection.java
+++ b/src/main/java/net/onrc/onos/graph/GraphDBConnection.java
@@ -11,6 +11,8 @@
 import com.tinkerpop.blueprints.Vertex;
 import com.tinkerpop.blueprints.util.wrappers.event.EventTransactionalGraph;
 import com.tinkerpop.frames.FramedGraph;
+import com.tinkerpop.frames.FramedGraphFactory;
+import com.tinkerpop.frames.modules.gremlingroovy.GremlinGroovyModule;
 
 public class GraphDBConnection implements IDBConnection {
 	public enum Transaction {
@@ -33,6 +35,7 @@
 			.getLogger(GraphDBConnection.class);
 	private static GraphDBConnection singleton = new GraphDBConnection();
 	private static TitanGraph graph;
+	private static FramedGraphFactory factory;
 	private static FramedGraph<TitanGraph> fg;
 	private static EventTransactionalGraph<TitanGraph> eg;
 	private static String configFile;
@@ -86,7 +89,9 @@
 				graph.createKeyIndex("ipv4_address", Vertex.class);
 			}
 			graph.commit();
-			fg = new FramedGraph<TitanGraph>(graph);
+			// Make sure you reuse the factory when creating new framed graphs
+			factory = new FramedGraphFactory(new GremlinGroovyModule());
+            fg = factory.create(graph);
 			eg = new EventTransactionalGraph<TitanGraph>(graph);
 		}
 		return singleton;
@@ -97,7 +102,12 @@
 	 */
 	@Override
 	public FramedGraph<TitanGraph> getFramedGraph() {
-		return fg;
+		if (isValid()) {
+			return fg;
+		} else {
+			log.error("New FramedGraph failed");
+			return null;
+		}
 	}
 
 	/**
@@ -167,7 +177,7 @@
 		try {
 			commit();
 		} catch (Exception e) {
-			log.error("{}", e.toString());
+			log.error("close() failed with exception", e);
 		}
 	}
 }
diff --git a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
index 40f5044..e519ec0 100644
--- a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
+++ b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
@@ -1,5 +1,7 @@
 package net.onrc.onos.graph;
 
+import java.util.Map;
+
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
 
 import org.slf4j.Logger;
@@ -34,11 +36,11 @@
 	}
 
 	@Override
-	public void edgeRemoved(Edge e) {
+	public void edgeRemoved(Edge e, Map<String, Object> arg1) {
 		// TODO Auto-generated method stub
 		// Fire NetMapEvents (LinkRemoved, FlowEntryRemoved, HostRemoved, PortRemoved)
 		TitanEdge edge = (TitanEdge) e;
-		log.debug("TopologyEvents: Received edge removed event: {}",edge.toString());
+		log.debug("TopologyEvents: Received edge removed event: {}",edge);
 		String label = edge.getLabel();
 		if (label.equals("link")) {
 			Vertex v = edge.getVertex(Direction.IN);
@@ -72,11 +74,11 @@
 	}
 
 	@Override
-	public void vertexRemoved(Vertex vertex) {
+	public void vertexRemoved(Vertex vertex, Map<String, Object> arg1) {
 		// TODO Auto-generated method stub
 		// Generate NetMapEvents 
 		String type = (String) vertex.getProperty("type");
-		log.debug("TopologyEvents: Received vertex removed event: {}",vertex.toString());
+		log.debug("TopologyEvents: Received vertex removed event: {}",vertex);
 		if (type.equals("port")) {
 			// port is removed...lets fire reconcile here directly for now
 			
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
index 33280a6..b11d6d8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
@@ -41,7 +41,6 @@
 import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryService;
 import net.onrc.onos.ofcontroller.proxyarp.BgpProxyArpManager;
 import net.onrc.onos.ofcontroller.proxyarp.IArpRequester;
-import net.onrc.onos.ofcontroller.proxyarp.IProxyArpService;
 import net.onrc.onos.ofcontroller.topology.ITopologyNetService;
 import net.onrc.onos.ofcontroller.topology.Topology;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
@@ -78,8 +77,7 @@
 
 public class BgpRoute implements IFloodlightModule, IBgpRouteService, 
 									ITopologyListener, IArpRequester,
-									IOFSwitchListener, IConfigInfoService,
-									IProxyArpService {
+									IOFSwitchListener, IConfigInfoService {
 	
 	private final static Logger log = LoggerFactory.getLogger(BgpRoute.class);
 
@@ -1019,12 +1017,12 @@
 		}
 		
 		OFMatch matchLLDP = new OFMatch();
-		matchLLDP.setDataLayerType((short)0x8942);
+		matchLLDP.setDataLayerType((short)0x88cc);
 		matchLLDP.setWildcards(matchLLDP.getWildcards() & ~ OFMatch.OFPFW_DL_TYPE);
 		fmLLDP.setMatch(matchLLDP);
 		
 		OFMatch matchBDDP = new OFMatch();
-		matchBDDP.setDataLayerType((short)0x88cc);
+		matchBDDP.setDataLayerType((short)0x8942);
 		matchBDDP.setWildcards(matchBDDP.getWildcards() & ~ OFMatch.OFPFW_DL_TYPE);
 		fmBDDP.setMatch(matchBDDP);
 		
@@ -1287,27 +1285,4 @@
 	public short getVlan() {
 		return vlan;
 	}
-
-	/*
-	 * TODO This is a hack to get the REST API to work for ProxyArpManager.
-	 * The REST API is currently tied to the Floodlight module system and we
-	 * need to separate it to allow ONOS modules to use it. For now we will 
-	 * proxy calls through to the ProxyArpManager (which is not a Floodlight 
-	 * module) through this class which is a module.
-	 */
-	@Override
-	public MACAddress getMacAddress(InetAddress ipAddress) {
-		return proxyArp.getMacAddress(ipAddress);
-	}
-
-	@Override
-	public void sendArpRequest(InetAddress ipAddress, IArpRequester requester,
-			boolean retry) {
-		proxyArp.sendArpRequest(ipAddress, requester, retry);		
-	}
-
-	@Override
-	public List<String> getMappings() {
-		return proxyArp.getMappings();
-	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/RestClient.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/RestClient.java
index a9f2abe..9606d24 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/bgproute/RestClient.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/RestClient.java
@@ -35,7 +35,7 @@
 				log.warn("The content received from {} is not json", str);
 			}		
 
-			BufferedReader br = new BufferedReader(new InputStreamReader((conn.getInputStream()))); 
+			BufferedReader br = new BufferedReader(new InputStreamReader((conn.getInputStream())));
 			String line;
 			while ((line = br.readLine()) != null) {
 				response.append(line);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
index b084be6..c830d44 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
@@ -29,7 +29,7 @@
  */
 public class DeviceStorageImpl implements IDeviceStorage {
 	protected final static Logger log = LoggerFactory.getLogger(DeviceStorageImpl.class);
-	
+
 	private GraphDBOperation ope;
 
 	/***
@@ -43,8 +43,8 @@
 		} catch (TitanException e) {
 			log.error("Couldn't open graph operation", e);
 		}
-	}	
-	
+	}
+
 	/***
 	 * Finalize/close function. After you use this class, please call this method.
 	 * It will close the DB connection.
@@ -53,20 +53,20 @@
 	public void close() {
 		ope.close();
 	}
-	
+
 	/***
 	 * Finalize/close function. After you use this class, please call this method.
 	 * It will close the DB connection. This is for Java garbage collection.
 	 */
 	@Override
-	public void finalize() {
+	protected void finalize() {
 		close();
 	}
 
 	/***
 	 * This function is for adding the device into the DB.
 	 * @param device The device you want to add into the DB.
-	 * @return IDeviceObject which was added in the DB. 
+	 * @return IDeviceObject which was added in the DB.
 	 */
 	@Override
 	public IDeviceObject addDevice(IDevice device) {
@@ -82,20 +82,20 @@
 	            	obj = ope.newDevice();
 	                log.debug("Adding device {}: creating new device", device.getMACAddressString());
 	            }
-	 			
+
 	            if (obj == null) {
 	            	return null;
 	            }
-	            
+
 	            changeDeviceAttachments(device, obj);
-		        
+
 	            changeDeviceIpv4Addresses(device, obj);
-	            
+
 	 			obj.setMACAddress(device.getMACAddressString());
 	 			obj.setType("device");
 	 			obj.setState("ACTIVE");
 	 			ope.commit();
-	 			
+
 	 			break;
 	 			//log.debug("Adding device {}",device.getMACAddressString());
 			} catch (TitanException e) {
@@ -104,14 +104,14 @@
 				obj = null;
 			}
 		}
- 		
-		return obj;		
+
+		return obj;
 	}
 
 	/***
 	 * This function is for updating the Device properties.
 	 * @param device The device you want to add into the DB.
-	 * @return IDeviceObject which was added in the DB. 
+	 * @return IDeviceObject which was added in the DB.
 	 */
 	@Override
 	public IDeviceObject updateDevice(IDevice device) {
@@ -125,12 +125,13 @@
 	@Override
 	public void removeDevice(IDevice device) {
 		IDeviceObject dev;
-		
+
 		if ((dev = ope.searchDevice(device.getMACAddressString())) != null) {
 			removeDevice(dev);
 		}
 	}
-	
+
+	@Override
 	public void removeDevice(IDeviceObject deviceObject) {
 		String deviceMac = deviceObject.getMACAddress();
 
@@ -144,12 +145,12 @@
 			log.error("DeviceStorage:removeDevice mac:{} failed", deviceMac);
 		}
 	}
-	
+
 	public void removeDeviceImpl(IDeviceObject deviceObject) {
 		for (IIpv4Address ipv4AddressVertex : deviceObject.getIpv4Addresses()) {
 			ope.removeIpv4Address(ipv4AddressVertex);
 		}
-		
+
 		ope.removeDevice(deviceObject);
 	}
 
@@ -202,9 +203,9 @@
 		} catch (TitanException e) {
 			ope.rollback();
 			log.error(":addDevice mac:{} failed", device.getMACAddressString());
-		}	
+		}
 	}
-	
+
 	/***
 	 * This function is for changing the Device attachment point.
 	 * @param device The new device you want change the attachment point
@@ -218,7 +219,7 @@
 			//Check if there is the port
 			IPortObject port = ope.searchPort(HexString.toHexString(ap.getSwitchDPID()),
 					(short) ap.getPort());
-			log.debug("New Switch Port is {},{}", 
+			log.debug("New Switch Port is {},{}",
 					HexString.toHexString(ap.getSwitchDPID()), (short) ap.getPort());
 
 			if (port != null){
@@ -228,18 +229,18 @@
 					attachedPorts.remove(port);
 				} else {
 					log.debug("Adding device {}: attaching to port", device.getMACAddressString());
-					port.setDevice(obj);  
+					port.setDevice(obj);
 				}
 
-				log.debug("port number is {}", port.getNumber().toString());
-				log.debug("port desc is {}", port.getDesc());  
+				log.debug("port number is {}", port.getNumber());
+				log.debug("port desc is {}", port.getDesc());
 			}
-		}      		 
+		}
 
 		for (IPortObject port: attachedPorts) {
 			log.debug("Detaching the device {}: detaching from port", device.getMACAddressString());
 			port.removeDevice(obj);
-			
+
 			if (!obj.getAttachedPorts().iterator().hasNext()) {
 				// XXX If there are no more ports attached to the device,
 				// delete it. Otherwise we have a situation where the
@@ -262,17 +263,17 @@
   		try {
   			if ((obj = ope.searchDevice(device.getMACAddressString())) != null) {
   				changeDeviceIpv4Addresses(device, obj);
-  	            
+
               	ope.commit();
   			} else {
             	log.error(":changeDeviceIPv4Address mac:{} failed", device.getMACAddressString());
-            }		
+            }
   		} catch (TitanException e) {
   			ope.rollback();
 			log.error(":changeDeviceIPv4Address mac:{} failed due to exception {}", device.getMACAddressString(), e);
 		}
 	}
-	
+
 	private void changeDeviceIpv4Addresses(IDevice device, IDeviceObject deviceObject) {
 		List<String> dbIpv4Addresses = new ArrayList<String>();
 		List<Integer> intDbIpv4Addresses = new ArrayList<Integer>();
@@ -280,33 +281,33 @@
 			dbIpv4Addresses.add(InetAddresses.fromInteger(ipv4Vertex.getIpv4Address()).getHostAddress());
 			intDbIpv4Addresses.add(ipv4Vertex.getIpv4Address());
 		}
-		
+
 		List<String> memIpv4Addresses = new ArrayList<String>();
 		for (int addr : device.getIPv4Addresses()) {
 			memIpv4Addresses.add(InetAddresses.fromInteger(addr).getHostAddress());
 		}
-		
+
 		log.debug("Device IP addresses {}, database IP addresses {}",
 				memIpv4Addresses, dbIpv4Addresses);
-		
+
 		for (int ipv4Address : device.getIPv4Addresses()) {
 			//if (deviceObject.getIpv4Address(ipv4Address) == null) {
 			if (!intDbIpv4Addresses.contains(ipv4Address)) {
 				IIpv4Address dbIpv4Address = ope.ensureIpv4Address(ipv4Address);
-				
+
 				/*
 				IDeviceObject oldDevice = dbIpv4Address.getDevice();
 				if (oldDevice != null) {
 					oldDevice.removeIpv4Address(dbIpv4Address);
 				}
 				*/
-				
-				log.debug("Adding IP address {}", 
+
+				log.debug("Adding IP address {}",
 						InetAddresses.fromInteger(ipv4Address).getHostAddress());
 				deviceObject.addIpv4Address(dbIpv4Address);
 			}
 		}
-			
+
 		List<Integer> deviceIpv4Addresses = Arrays.asList(device.getIPv4Addresses());
 		for (IIpv4Address dbIpv4Address : deviceObject.getIpv4Addresses()) {
 			if (!deviceIpv4Addresses.contains(dbIpv4Address.getIpv4Address())) {
@@ -317,39 +318,41 @@
 			}
 		}
 	}
-	
+
 	/**
 	 * Takes an {@link OnosDevice} and adds it into the database. There is no
-	 * checking of the database data and removing old data - an 
+	 * checking of the database data and removing old data - an
 	 * {@link OnosDevice} basically corresponds to a packet we've just seen,
 	 * and we need to add that information into the database.
 	 */
 	@Override
 	public void addOnosDevice(OnosDevice onosDevice) {
 		String macAddress = HexString.toHexString(onosDevice.getMacAddress().toBytes());
-		
+
 		//if the switch port we try to attach a new device already has a link, then stop adding device
 		IPortObject portObject1 = ope.searchPort(HexString.toHexString(
 				onosDevice.getSwitchDPID()), onosDevice.getSwitchPort());
 
 		if ((portObject1 != null) && portObject1.getLinkedPorts().iterator().hasNext()) {
-			log.debug("stop adding OnosDevice: {} due to there is a link to: {}",
-					onosDevice, portObject1.getLinkedPorts().iterator().next().getPortId());
+			if (log.isDebugEnabled()) {
+				log.debug("stop adding OnosDevice: {} due to there is a link to: {}",
+						onosDevice, portObject1.getLinkedPorts().iterator().next().getPortId());
+			}
 			return;
 		}
-		
+
 		log.debug("addOnosDevice: {}", onosDevice);
 
 		try {
 			IDeviceObject device = ope.searchDevice(macAddress);
-			
+
 			if (device == null) {
 				device = ope.newDevice();
 				device.setType("device");
 				device.setState("ACTIVE");
 				device.setMACAddress(macAddress);
 			}
-			
+
 			// Check if the device has the IP address, add it if it doesn't
 			if (onosDevice.getIpv4Address() != null) {
 				boolean hasIpAddress = false;
@@ -359,7 +362,7 @@
 						break;
 					}
 				}
-				
+
 				if (!hasIpAddress) {
 					IIpv4Address ipv4Address = ope.ensureIpv4Address(onosDevice.getIpv4Address().intValue());
 					IDeviceObject oldDevice = ipv4Address.getDevice();
@@ -369,7 +372,7 @@
 					device.addIpv4Address(ipv4Address);
 				}
 			}
-			
+
 			// Check if the device has the attachment point, add it if not
 			// TODO single attachment point for now, extend to multiple later
 			String switchDpid = HexString.toHexString(onosDevice.getSwitchDPID());
@@ -388,7 +391,7 @@
 					}
 				}
 			}
-			
+
 			/*
 			for (IPortObject portObject : device.getAttachedPorts()) {
 				ISwitchObject switchObject = portObject.getSwitch();
@@ -399,14 +402,14 @@
 				}
 			}
 			*/
-			
+
 			if (!hasAttachmentPoint) {
 				IPortObject portObject = ope.searchPort(switchDpid, onosDevice.getSwitchPort());
 				if (portObject != null) {
 					portObject.setDevice(device);
 				}
 			}
-			
+
 			ope.commit();
 		}
 		catch (TitanException e) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
index a9088de..60f8e10 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
@@ -19,11 +19,11 @@
  * This is the class for storing the information of links into GraphDB
  */
 public class LinkStorageImpl implements ILinkStorage {
-	
+
 	protected final static Logger log = LoggerFactory.getLogger(LinkStorageImpl.class);
 	protected GraphDBOperation op;
 
-	
+
 	/**
 	 * Initialize the object. Open LinkStorage using given configuration file.
 	 * @param conf Path (absolute path for now) to configuration file.
@@ -39,7 +39,7 @@
 	//  Routine process should be implemented in private method.
 	//  A private method MUST NOT call commit or rollback.
 
-	
+
 	/**
 	 * Update a record in the LinkStorage in a way provided by dmop.
 	 * @param link Record of a link to be updated.
@@ -49,7 +49,7 @@
 	@Override
 	public boolean update(Link link, LinkInfo linkinfo, DM_OPERATION dmop) {
 		boolean success = false;
-		
+
 		switch (dmop) {
 		case CREATE:
 		case INSERT:
@@ -99,7 +99,7 @@
 			}
 			break;
 		}
-		
+
 		return success;
 	}
 
@@ -129,7 +129,7 @@
 	@Override
 	public boolean addLink(Link link, LinkInfo linfo) {
 		boolean success = false;
-		
+
 		try {
 			//delete the Device attachment points for the related switch and port
 			deleteDeviceOnPort(link.getSrc(),link.getSrcPort());
@@ -154,10 +154,10 @@
 			e.printStackTrace();
 			log.error("LinkStorageImpl:addLink link:{} linfo:{} failed", link, linfo);
 		}
-		
+
 		return success;
 	}
-	
+
 	/**
 	 * Update multiple records in the LinkStorage in a way provided by op.
 	 * @param links List of records to be updated.
@@ -166,13 +166,13 @@
 	@Override
 	public boolean addLinks(List<Link> links) {
 		boolean success = false;
-		
+
 		for (Link lt: links) {
 			if (! addLinkImpl(lt)) {
 				return false;
 			}
 		}
-		
+
 		try {
 			op.commit();
 			success = true;
@@ -181,7 +181,7 @@
 			e.printStackTrace();
 			log.error("LinkStorageImpl:addLinks link:s{} failed", links);
 		}
-		
+
 		return success;
 	}
 
@@ -192,9 +192,9 @@
 	@Override
 	public boolean deleteLink(Link lt) {
 		boolean success = false;
-		
+
 		log.debug("LinkStorageImpl:deleteLink(): {}", lt);
-		
+
         try {
          	if (deleteLinkImpl(lt)) {
         		op.commit();
@@ -210,7 +210,7 @@
         			new Object[]{lt, e.toString()});
         	e.printStackTrace();
         }
-        
+
         return success;
 	}
 
@@ -221,7 +221,7 @@
 	@Override
 	public boolean deleteLinks(List<Link> links) {
 		boolean success = false;
-		
+
 		try {
 			for (Link lt : links) {
 				if (! deleteLinkImpl(lt)) {
@@ -236,7 +236,7 @@
 			e.printStackTrace();
         	log.error("LinkStorageImpl:deleteLinks failed invalid vertices {}", links);
 		}
-		
+
 		return success;
 	}
 
@@ -256,7 +256,7 @@
 	    ISwitchObject srcSw = srcPort.getSwitch();
 	    if (srcSw == null)
 		return links;
-    	
+
 	    for(IPortObject dstPort : srcPort.getLinkedPorts()) {
 		ISwitchObject dstSw = dstPort.getSwitch();
 		if (dstSw != null) {
@@ -278,14 +278,14 @@
 	@Override
 	public List<Link> getReverseLinks(Long dpid, short port) {
 	    List<Link> links = new ArrayList<Link>();
-    	
+
 	    IPortObject srcPort = op.searchPort(HexString.toHexString(dpid), port);
 	    if (srcPort == null)
 		return links;
 	    ISwitchObject srcSw = srcPort.getSwitch();
 	    if (srcSw == null)
 		return links;
-    	
+
 	    for(IPortObject dstPort : srcPort.getReverseLinkedPorts()) {
 		ISwitchObject dstSw = dstPort.getSwitch();
 		if (dstSw != null) {
@@ -297,7 +297,7 @@
 	    }
 	    return links;
 	}
-	
+
 	/**
 	 * Delete records of the links connected to the port specified by given DPID and port number.
 	 * @param dpid DPID of desired port.
@@ -306,7 +306,7 @@
 	@Override
 	public boolean deleteLinksOnPort(Long dpid, short port) {
 		boolean success = false;
-		
+
 		List<Link> linksToDelete = getLinks(dpid, port);
 		try {
 			for(Link l : linksToDelete) {
@@ -323,7 +323,7 @@
 			e.printStackTrace();
         	log.error("LinkStorageImpl:deleteLinksOnPort dpid:{} port:{} failed", dpid, port);
 		}
-		
+
 		return success;
 	}
 
@@ -337,7 +337,7 @@
 		List<Link> links = new ArrayList<Link>();
 
 		ISwitchObject srcSw = op.searchSwitch(dpid);
-		
+
 		if(srcSw != null) {
 			for(IPortObject srcPort : srcSw.getPorts()) {
 				for(IPortObject dstPort : srcPort.getLinkedPorts()) {
@@ -352,7 +352,7 @@
 				}
 			}
 		}
-		
+
 		return links;
 	}
 
@@ -367,7 +367,7 @@
 		List<Link> links = new ArrayList<Link>();
 
 		ISwitchObject srcSw = op.searchSwitch(dpid);
-		
+
 		if(srcSw != null) {
 			for(IPortObject srcPort : srcSw.getPorts()) {
 				for(IPortObject dstPort : srcPort.getReverseLinkedPorts()) {
@@ -376,7 +376,7 @@
 		        		Link link = new Link(
 							HexString.toLong(dstSw.getDPID()),
 							dstPort.getNumber(),
-					
+
 							HexString.toLong(dpid),
 							srcPort.getNumber());
 		        		links.add(link);
@@ -384,7 +384,7 @@
 				}
 			}
 		}
-		
+
 		return links;
 	}
 
@@ -392,16 +392,17 @@
 	 * Get list of all links whose state is ACTIVE.
 	 * @return List of active links. Empty list if no port was found.
 	 */
+	@Override
 	public List<Link> getActiveLinks() {
 		Iterable<ISwitchObject> switches = op.getActiveSwitches();
 
-		List<Link> links = new ArrayList<Link>(); 
-		
+		List<Link> links = new ArrayList<Link>();
+
 		for (ISwitchObject srcSw : switches) {
 			for(IPortObject srcPort : srcSw.getPorts()) {
 				for(IPortObject dstPort : srcPort.getLinkedPorts()) {
 					ISwitchObject dstSw = dstPort.getSwitch();
-					
+
 					if(dstSw != null && dstSw.getState().equals("ACTIVE")) {
 						links.add(new Link(HexString.toLong(srcSw.getDPID()),
 								srcPort.getNumber(),
@@ -411,10 +412,10 @@
 				}
 			}
 		}
-		
+
 		return links;
 	}
-	
+
 	@Override
 	public LinkInfo getLinkInfo(Link link) {
 		// TODO implement this
@@ -424,7 +425,8 @@
 	/**
 	 * Finalize the object.
 	 */
-	public void finalize() {
+	@Override
+	protected void finalize() {
 		close();
 	}
 
@@ -434,7 +436,7 @@
 	@Override
 	public void close() {
 		// TODO Auto-generated method stub
-//		graph.shutdown();		
+//		graph.shutdown();
 	}
 
 	/**
@@ -444,25 +446,25 @@
 	 */
 	private boolean setLinkInfoImpl(Link link, LinkInfo linkinfo) {
 		// TODO implement this
-		
+
 		return false;
 	}
 
 	private boolean addLinkImpl(Link lt) {
 		boolean success = false;
-		
+
 		IPortObject vportSrc = null, vportDst = null;
-		
+
 		// get source port vertex
 		String dpid = HexString.toHexString(lt.getSrc());
 		short port = lt.getSrcPort();
 		vportSrc = op.searchPort(dpid, port);
-		
+
 		// get dest port vertex
 		dpid = HexString.toHexString(lt.getDst());
 		port = lt.getDstPort();
 		vportDst = op.searchPort(dpid, port);
-		            
+
 		if (vportSrc != null && vportDst != null) {
 			IPortObject portExist = null;
 			// check if the link exists
@@ -472,41 +474,41 @@
 					break;
 				}
 			}
-		
+
 			if (portExist == null) {
 				vportSrc.setLinkPort(vportDst);
 				success = true;
 			} else {
-				log.debug("LinkStorageImpl:addLinkImpl failed link exists {} {} src {} dst {}", 
+				log.debug("LinkStorageImpl:addLinkImpl failed link exists {} {} src {} dst {}",
 						new Object[]{op, lt, vportSrc, vportDst});
 			}
 		}
-		
+
 		return success;
 	}
 
 	private boolean deleteLinkImpl(Link lt) {
 		boolean success = false;
 		IPortObject vportSrc = null, vportDst = null;
-	
+
 	    // get source port vertex
 	 	String dpid = HexString.toHexString(lt.getSrc());
 	 	short port = lt.getSrcPort();
 	 	vportSrc = op.searchPort(dpid, port);
-	    
+
 	    // get dst port vertex
 	 	dpid = HexString.toHexString(lt.getDst());
 	 	port = lt.getDstPort();
 	 	vportDst = op.searchPort(dpid, port);
-	 	
+
 		// FIXME: This needs to remove all edges
-	 	if (vportSrc != null && vportDst != null) {
-	 		vportSrc.removeLink(vportDst);
-	    	log.debug("deleteLinkImpl(): deleted edges src {} dst {}", new Object[]{
-	    			lt, vportSrc, vportDst});
-	    	success = true;
-	    }
-	    
-	 	return success;
+		if (vportSrc != null && vportDst != null) {
+			vportSrc.removeLink(vportDst);
+			log.debug("deleteLinkImpl(): deleted edge {} src {} dst {}", new Object[]{
+				lt, vportSrc, vportDst});
+			success = true;
+		}
+
+		return success;
 	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
index a92ab72..9963887 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
@@ -39,7 +39,7 @@
 	 * It will close the DB connection.
 	 */
 	@Override
-	public void finalize() {
+	protected void finalize() {
 		close();
 	}
 
@@ -169,7 +169,7 @@
 			success = true;
 		} catch (Exception e) {
 			op.rollback();
-			log.error("SwitchStorage:addSwitch dpid:{} failed", dpid, e);
+			log.error("SwitchStorage:addSwitch dpid:"+dpid+" failed", e);
 		}
 
 		return success;
@@ -202,7 +202,7 @@
 		} catch (Exception e) {
 			op.rollback();
 			e.printStackTrace();
-			log.error("SwitchStorage:addSwitch dpid:{} failed", dpid, e);
+			log.error("SwitchStorage:addSwitch dpid:"+dpid+" failed", e);
 		}
 
 		return success;
@@ -253,7 +253,7 @@
 		} catch (Exception e) {
 			// TODO what type of exception is thrown when we can't commit?
 			op.rollback();
-			log.error("SwitchStorage:deactivateSwitch {} failed", dpid, e);
+			log.error("SwitchStorage:deactivateSwitch "+dpid+" failed", e);
 		}
 
 		return success;
@@ -407,7 +407,7 @@
 		if (sw != null && state != null) {
 			sw.setState(state.toString());
 			log.info("SwitchStorage:setSwitchStateImpl dpid:{} updated {}",
-					sw.getDPID(), state.toString());
+					sw.getDPID(), state);
 		}
 	}
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
index 71fecd0..5e01578 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
@@ -18,25 +18,26 @@
 import com.tinkerpop.pipes.transform.PathPipe;
 
 public class TopoLinkServiceImpl implements ITopoLinkService {
-	
+
 	protected GraphDBOperation op;
 	protected final static Logger log = LoggerFactory.getLogger(TopoLinkServiceImpl.class);
 
-	public void finalize() {
+	@Override
+	protected void finalize() {
 		close();
 	}
-	
+
 	@Override
 	public void close() {
 		op.close();
 	}
- 
+
 	@Override
 	public List<Link> getActiveLinks() {
 		op = new GraphDBOperation("");
 		op.commit(); //Commit to ensure we see latest data
 		Iterable<ISwitchObject> switches = op.getActiveSwitches();
-		List<Link> links = new ArrayList<Link>(); 
+		List<Link> links = new ArrayList<Link>();
 		for (ISwitchObject sw : switches) {
 			GremlinPipeline<Vertex, Link> pipe = new GremlinPipeline<Vertex, Link>();
 			ExtractLink extractor = new ExtractLink();
@@ -44,12 +45,12 @@
 			pipe.start(sw.asVertex());
 			pipe.enablePath(true);
 			pipe.out("on").out("link").in("on").path().step(extractor);
-					
+
 			while (pipe.hasNext() ) {
 				Link l = pipe.next();
 				links.add(l);
 			}
-						
+
 		}
 		op.commit();
 		return links;
@@ -57,7 +58,7 @@
 
 	@Override
 	public List<Link> getLinksOnSwitch(String dpid) {
-		List<Link> links = new ArrayList<Link>(); 
+		List<Link> links = new ArrayList<Link>();
 		ISwitchObject sw = op.searchSwitch(dpid);
 		GremlinPipeline<Vertex, Link> pipe = new GremlinPipeline<Vertex, Link>();
 		ExtractLink extractor = new ExtractLink();
@@ -65,7 +66,7 @@
 		pipe.start(sw.asVertex());
 		pipe.enablePath(true);
 		pipe.out("on").out("link").in("on").path().step(extractor);
-			
+
 		while (pipe.hasNext() ) {
 			Link l = pipe.next();
 			links.add(l);
@@ -74,14 +75,14 @@
 
 	}
 
-	private class ExtractLink implements PipeFunction<PathPipe<Vertex>, Link> {
+	private static class ExtractLink implements PipeFunction<PathPipe<Vertex>, Link> {
 		@Override
 		public Link compute(PathPipe<Vertex> pipe) {
 			long s_dpid = 0;
 			long d_dpid = 0;
 			short s_port = 0;
 			short d_port = 0;
-			
+
 			List<?> V = pipe.next();
 			Vertex src_sw = (Vertex)V.get(0);
 			Vertex dest_sw = (Vertex)V.get(3);
@@ -91,9 +92,9 @@
 			d_dpid = HexString.toLong((String) dest_sw.getProperty("dpid"));
 			s_port = (Short) src_port.getProperty("number");
 			d_port = (Short) dest_port.getProperty("number");
-			
+
 			Link l = new Link(s_dpid,s_port,d_dpid,d_port);
-			
+
 			return l;
 		}
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
index 3a324b1..52a5817 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
@@ -9,7 +9,7 @@
 import org.slf4j.LoggerFactory;
 
 public class TopoSwitchServiceImpl implements ITopoSwitchService {
-	
+
 	private GraphDBOperation op;
 	protected final static Logger log = LoggerFactory.getLogger(TopoSwitchServiceImpl.class);
 
@@ -20,16 +20,17 @@
 	public TopoSwitchServiceImpl() {
 		this("");
 	}
-	
-	public void finalize() {
+
+	@Override
+	protected void finalize() {
 		close();
 	}
-	
+
 	@Override
 	public void close() {
 		op.close();
 	}
-	
+
 	@Override
 	public Iterable<ISwitchObject> getActiveSwitches() {
 		// TODO Auto-generated method stub
@@ -65,5 +66,5 @@
 	public IPortObject getPortOnSwitch(String dpid, short port_num) {
 		// TODO Auto-generated method stub
 		return null;
-	}	
+	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
index ea547fc..68c2b7c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
@@ -20,6 +20,7 @@
 import net.floodlightcontroller.devicemanager.IDeviceListener;
 import net.floodlightcontroller.routing.Link;
 import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.graph.GraphDBConnection;
 import net.onrc.onos.graph.GraphDBOperation;
@@ -37,7 +38,7 @@
 import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryListener;
 import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryService;
 import net.onrc.onos.ofcontroller.linkdiscovery.LinkInfo;
-import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
+import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.registry.controller.IControllerRegistryService;
 import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
@@ -384,8 +385,9 @@
 		log.debug("{}:deviceAdded(): Adding device {}",this.getClass(),device.getMACAddressString());
 		devStore.addDevice(device);
 		for (int intIpv4Address : device.getIPv4Addresses()) {
-			datagridService.sendArpRequest(
-					ArpMessage.newReply(InetAddresses.fromInteger(intIpv4Address)));
+			datagridService.sendArpReplyNotification(new ArpReplyNotification(
+					InetAddresses.fromInteger(intIpv4Address), 
+					MACAddress.valueOf(device.getMACAddress())));
 		}
 	}
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index 9969cfc..c3d7501 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -4,8 +4,6 @@
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.List;
-
 import net.floodlightcontroller.util.MACAddress;
 
 import net.onrc.onos.graph.GraphDBOperation;
@@ -49,13 +47,13 @@
 	    String stacktrace = sw.toString();
 
 	    log.error(":addFlow FlowId:{} failed: {}",
-		      flowPath.flowId().toString(),
+		      flowPath.flowId(),
 		      stacktrace);
 	    return false;
 	}
 	if (flowObj == null) {
 	    log.error(":addFlow FlowId:{} failed: Flow object not created",
-		      flowPath.flowId().toString());
+		      flowPath.flowId());
 	    dbHandler.rollback();
 	    return false;
 	}
@@ -205,12 +203,12 @@
 	    }
 	} catch (Exception e) {
 	    log.error(":addFlow FlowEntryId:{} failed",
-		      flowEntry.flowEntryId().toString());
+		      flowEntry.flowEntryId());
 	    return null;
 	}
 	if (flowEntryObj == null) {
 	    log.error(":addFlow FlowEntryId:{} failed: FlowEntry object not created",
-		      flowEntry.flowEntryId().toString());
+		      flowEntry.flowEntryId());
 	    return null;
 	}
 
@@ -221,7 +219,7 @@
 	flowEntryObj.setFlowEntryId(flowEntry.flowEntryId().toString());
 	flowEntryObj.setType("flow_entry");
 
-	// 
+	//
 	// Set the Flow Entry Edges and attributes:
 	// - Switch edge
 	// - InPort edge
@@ -344,7 +342,7 @@
 	    flowEntryObj = dbHandler.searchFlowEntry(flowEntry.flowEntryId());
 	} catch (Exception e) {
 	    log.error(":deleteFlowEntry FlowEntryId:{} failed",
-		      flowEntry.flowEntryId().toString());
+		      flowEntry.flowEntryId());
 	    return false;
 	}
 	//
@@ -354,7 +352,7 @@
 	/*
 	if (flowEntryObj == null) {
 	    log.error(":deleteFlowEntry FlowEntryId:{} failed: FlowEntry object not found",
-		      flowEntry.flowEntryId().toString());
+		      flowEntry.flowEntryId());
 	    return false;
 	}
 	*/
@@ -373,24 +371,15 @@
      * @return true on success, otherwise false.
      */
     static boolean deleteAllFlows(GraphDBOperation dbHandler) {
-	List<FlowId> allFlowIds = new LinkedList<FlowId>();
-
 	// Get all Flow IDs
 	Iterable<IFlowPath> allFlowPaths = dbHandler.getAllFlowPaths();
 	for (IFlowPath flowPathObj : allFlowPaths) {
 	    if (flowPathObj == null)
 		continue;
-	    String flowIdStr = flowPathObj.getFlowId();
-	    if (flowIdStr == null)
-		continue;
-	    FlowId flowId = new FlowId(flowIdStr);
-	    allFlowIds.add(flowId);
-	}
 
-	// Delete all flows one-by-one
-	for (FlowId flowId : allFlowIds) {
-	    deleteFlow(dbHandler, flowId);
+	    deleteIFlowPath(dbHandler, flowPathObj);
 	}
+	dbHandler.commit();
 
 	return true;
     }
@@ -409,7 +398,7 @@
 	} catch (Exception e) {
 	    // TODO: handle exceptions
 	    dbHandler.rollback();
-	    log.error(":deleteFlow FlowId:{} failed", flowId.toString());
+	    log.error(":deleteFlow FlowId:{} failed", flowId);
 	    return false;
 	}
 	if (flowObj == null) {
@@ -417,6 +406,18 @@
 	    return true;		// OK: No such flow
 	}
 
+	deleteIFlowPath(dbHandler, flowObj);
+	dbHandler.commit();
+	return true;
+    }
+
+    /**
+     * Delete a previously added flow.
+     * @note You need to call commit after calling this method.
+     * @param dbHandler the Graph Database handler to use.
+     * @param flowObj IFlowPath object to delete.
+     */
+    private static void deleteIFlowPath(GraphDBOperation dbHandler, IFlowPath flowObj) {
 	//
 	// Remove all Flow Entries
 	//
@@ -427,9 +428,6 @@
 	}
 	// Remove the Flow itself
 	dbHandler.removeFlowPath(flowObj);
-	dbHandler.commit();
-
-	return true;
     }
 
     /**
@@ -446,7 +444,7 @@
 	} catch (Exception e) {
 	    // TODO: handle exceptions
 	    dbHandler.rollback();
-	    log.error(":getFlow FlowId:{} failed", flowId.toString());
+	    log.error(":getFlow FlowId:{} failed", flowId);
 	    return null;
 	}
 	if (flowObj == null) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index afadaa4..7550cfd 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -31,7 +31,7 @@
 import net.onrc.onos.ofcontroller.util.FlowPathUserState;
 import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
 
-import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo.Kryo;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -273,6 +273,10 @@
 
 	for (FlowPath flowPath : flowPaths) {
 	    boolean isInstalled = true;
+	    
+	    if (flowPath.flowEntries().isEmpty()) {
+	    	continue;
+	    }
 
 	    //
 	    // Check whether all Flow Entries have been installed
@@ -355,8 +359,7 @@
 	for (EventEntry<FlowPath> eventEntry : flowPathEvents) {
 	    FlowPath flowPath = eventEntry.eventData();
 
-	    log.debug("Flow Event: {} {}", eventEntry.eventType(),
-		      flowPath.toString());
+	    log.debug("Flow Event: {} {}", eventEntry.eventType(), flowPath);
 
 	    switch (eventEntry.eventType()) {
 	    case ENTRY_ADD: {
@@ -440,7 +443,7 @@
 	    TopologyElement topologyElement = eventEntry.eventData();
 
 	    log.debug("Topology Event: {} {}", eventEntry.eventType(),
-		      topologyElement.toString());
+		      topologyElement);
 
 	    switch (eventEntry.eventType()) {
 	    case ENTRY_ADD:
@@ -507,7 +510,7 @@
 	    FlowEntry flowEntry = eventEntry.eventData();
 
 	    log.debug("Flow Entry Event: {} {}", eventEntry.eventType(),
-		      flowEntry.toString());
+		      flowEntry);
 
 	    if ((! flowEntry.isValidFlowId()) ||
 		(! flowEntry.isValidFlowEntryId())) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 53876d2..f266163 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -39,7 +39,7 @@
 
 import com.thinkaurelius.titan.core.TitanException;
 
-import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo.Kryo;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,7 +90,7 @@
      * Shutdown the Flow Manager operation.
      */
     @Override
-    public void finalize() {
+    protected void finalize() {
     	close();
     }
 
@@ -400,6 +400,11 @@
 	if (srcDpid.value() != sw.getId())
 	    return;
 	deleteFlow(flowPath.flowId());
+	
+	// Send flow deleted notification to the Forwarding module
+	// TODO This is a quick fix for flow-removed notifications. We
+	// should think more about the design of these notifications.
+	notificationFlowPathRemoved(flowPath);
     }
 
     /**
@@ -469,6 +474,20 @@
     }
 
     /**
+     * Generate a notification that a FlowPath has been removed from the 
+     * network. This means we've received an expiry message for the flow
+     * from the switch, and send flowmods to remove any remaining parts of
+     * the path.
+     * 
+     * @param flowPath FlowPath object that was removed from the network.
+     */
+    void notificationFlowPathRemoved(FlowPath flowPath) {
+	if (forwardingService != null) {
+		forwardingService.flowRemoved(flowPath);
+	}
+    }
+
+    /**
      * Push modified Flow-related state as appropriate.
      *
      * @param modifiedFlowPaths the collection of modified Flow Paths.
@@ -533,7 +552,7 @@
 		flowEntry.setFlowEntryId(new FlowEntryId(id));
 	    }
 
-	    log.debug("Pushing Flow Entry To Switch: {}", flowEntry.toString());
+	    log.debug("Pushing Flow Entry To Switch: {}", flowEntry);
 	    entries.add(new Pair<IOFSwitch, FlowEntry>(mySwitch, flowEntry));
 	}
 
@@ -579,7 +598,7 @@
 	    if (mySwitch != null)
 		continue;
 
-	    log.debug("Pushing cleanup of Flow Entry To Datagrid: {}", flowEntry.toString());
+	    log.debug("Pushing cleanup of Flow Entry To Datagrid: {}", flowEntry);
 
 	    //
 	    // Write the Flow Entry to the Datagrid
@@ -693,8 +712,7 @@
 	    //
 	    if (flowPath.flowPathUserState() ==
 		FlowPathUserState.FP_USER_DELETE) {
-		log.debug("Deleting Flow Path From Database: {}",
-			  flowPath.toString());
+		log.debug("Deleting Flow Path From Database: {}", flowPath);
 
 		boolean retry = false;
 		do {
@@ -712,7 +730,7 @@
 			retry = true;
 		    } catch (Exception e) {
 			log.error("Exception deleting Flow Path from Network MAP: {}", e);
-		    } 
+		    }
 		} while (retry);
 
 		continue;
@@ -740,7 +758,7 @@
 	    if (! allValid)
 		continue;
 
-	    log.debug("Pushing Flow Path To Database: {}", flowPath.toString());
+	    log.debug("Pushing Flow Path To Database: {}", flowPath);
 
 	    //
 	    // Write the Flow Path to the Network Map
@@ -750,9 +768,7 @@
 		retry = false;
 		try {
 		    if (! FlowDatabaseOperation.addFlow(dbHandlerInner, flowPath)) {
-			String logMsg = "Cannot write to Network Map Flow Path " +
-			    flowPath.flowId();
-			log.error(logMsg);
+			log.error("Cannot write to Network Map Flow Path {}", flowPath.flowId());
 			retry = true;
 		    }
 		} catch (TitanException te) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java
index 2a7574b..4f4c1e4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java
@@ -51,7 +51,7 @@
 	ObjectMapper mapper = new ObjectMapper();
 	String flowPathStr = flowJson;
 	FlowPath flowPath = null;
-	log.debug("Add Flow Path: " + flowPathStr);
+	log.debug("Add Flow Path: {}", flowPathStr);
 	try {
 	    flowPath = mapper.readValue(flowPathStr, FlowPath.class);
 	} catch (JsonGenerationException e) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DeleteFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DeleteFlowResource.java
index d0991e8..ab68584 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DeleteFlowResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DeleteFlowResource.java
@@ -46,7 +46,7 @@
 	    result = flowService.deleteAllFlows();
 	} else {
 	    FlowId flowId = new FlowId(flowIdStr);
-	    log.debug("Delete Flow Id: " + flowIdStr);
+	    log.debug("Delete Flow Id: {}", flowIdStr);
 	    result = flowService.deleteFlow(flowId);
 	}
 	return result;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
index 1bfb6a3..1cbeece 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
@@ -41,7 +41,7 @@
 	String flowIdStr = (String) getRequestAttributes().get("flow-id");
 	FlowId flowId = new FlowId(flowIdStr);
 
-	log.debug("Get Flow Id: " + flowIdStr);
+	log.debug("Get Flow Id: {}", flowIdStr);
 
 	result = flowService.getFlow(flowId);
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
index 9cffb56..aa31abd 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
@@ -47,8 +47,8 @@
         // Extract the arguments
     	String flowIdStr = (String) getRequestAttributes().get("flow-id");
     	String maxFlowStr = (String) getRequestAttributes().get("max-flows");
-    	log.debug("Get Summary Flows starting flow-id: " + flowIdStr + " max-flows: " + maxFlowStr);
-    	
+    	log.debug("Get Summary Flows starting flow-id: {} max-flows: {}" ,flowIdStr, maxFlowStr);
+
     	flowId = new FlowId(flowIdStr);
     	maxFlows = Integer.parseInt(maxFlowStr);
     	if (maxFlows < 0) maxFlows = 0;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index 641faaf..4915cc7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -44,7 +44,7 @@
 				       IOFSwitchListener {
     // flag to enable FlowSynchronizer
     private static final boolean enableFlowSync = true;
-    protected static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
+    protected static final Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
     protected volatile IFloodlightProviderService floodlightProvider;
     protected volatile IControllerRegistryService registryService;
     protected volatile IRestApiService restApi;
@@ -140,7 +140,7 @@
 	case FLOW_REMOVED:
 	    OFFlowRemoved flowMsg = (OFFlowRemoved) msg;
 	    FlowEntryId id = new FlowEntryId(flowMsg.getCookie());
-	    log.debug("Got flow entry removed from " + sw.getId() + ": " + id);
+	    log.debug("Got flow entry removed from {}: {}",sw.getId(), id);
 	    flowManager.flowEntryOnSwitchExpired(sw, id);
 	    break;
 	default:
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 4529382..f2a1828 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -505,7 +505,7 @@
 			// Unknown user state. Ignore the entry
 			log.debug(
 					"Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
-					flowEntry.flowEntryId().toString(),
+					flowEntry.flowEntryId(),
 					flowEntry.flowEntryUserState());
 			return false;
 		}
@@ -747,13 +747,16 @@
 		//
 		// Write the message to the switch
 		//
-		log.debug("Installing flow entry "
-				+ flowEntry.flowEntryUserState() + " into switch DPID: "
-				+ sw.getStringId() + " flowEntryId: "
-				+ flowEntry.flowEntryId().toString() + " srcMac: "
-				+ matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
-				+ matchInPort + " outPort: " + actionOutputPort);
-		
+		log.debug("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
+			, flowEntry.flowEntryUserState()
+			, sw.getStringId()
+			, flowEntry.flowEntryId()
+			, matchSrcMac
+			, matchDstMac
+			, matchInPort
+			, actionOutputPort
+			);
+
 		return add(sw, fm);
 	}
 	
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 6ef44be..23bf7ae 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -102,6 +102,10 @@
 	    Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
 	    long step1 = System.nanoTime();
 	    Set<FlowEntryWrapper> switchEntries = getFlowEntriesFromSwitch();
+	    if (switchEntries == null) {
+	    	log.debug("getFlowEntriesFromSwitch() failed");
+	    	return null;
+	    }
 	    long step2 = System.nanoTime();
 	    SyncResult result = compare(graphEntries, switchEntries);
 	    long step3 = System.nanoTime();
@@ -124,14 +128,14 @@
 	    extractTime /= div;
 	    pushTime /= div;
 	    totalTime /= div;
-	    log.debug("Sync time (ms):" +
-	    		  graphIDTime + "," +
-	     		  switchTime + "," + 
-	    		  compareTime + "," +
-	     		  graphEntryTime + "," +
-	    		  extractTime + "," + 
-	     		  pushTime + "," +
-	              totalTime);
+	    log.debug("Sync time (ms):{},{},{},{},{},{},{}"
+	              , graphIDTime
+	              , switchTime
+	              , compareTime
+	              , graphEntryTime
+	              , extractTime
+	              , pushTime
+	              , totalTime);
 	}
 
 	/**
@@ -161,11 +165,14 @@
 		extractTime += entry.extractTime;
 		pushTime += entry.pushTime;
 		added++;
-	    }	  
-	    log.debug("Flow entries added "+ added + ", " +
-		      "Flow entries removed "+ removed + ", " +
-		      "Flow entries skipped " + skipped);
-	    
+	    }
+	    log.debug("Flow entries added {}, " +
+		      "Flow entries removed {}, " +
+		      "Flow entries skipped {}"
+		      , added
+		      , removed
+		      , skipped );
+
 	    return new SyncResult(added, removed, skipped);
 	}
 
@@ -213,12 +220,15 @@
 	    } catch (IOException e) {
 		// TODO Auto-generated catch block
 		e.printStackTrace();
+		return null;
 	    } catch (InterruptedException e) {
 		// TODO Auto-generated catch block
 		e.printStackTrace();
+		return null;
 	    } catch (ExecutionException e) {
 		// TODO Auto-generated catch block
 		e.printStackTrace();
+		return null;
 	    }
 
 	    Set<FlowEntryWrapper> results = new HashSet<FlowEntryWrapper>();
@@ -258,7 +268,7 @@
 	double dbTime, extractTime, pushTime;
 	public void addToSwitch(IOFSwitch sw) {
 	    if (statisticsReply != null) {
-		log.error("Error adding existing flow entry {} to sw {}", 
+		log.error("Error adding existing flow entry {} to sw {}",
 			  statisticsReply.getCookie(), sw.getId());
 		return;
 	    }
@@ -289,19 +299,19 @@
 		return;
 	    }
 	    extractTime = System.nanoTime() - startExtract;
-	    
+
 	    double startPush = System.nanoTime();
 	    pusher.pushFlowEntry(sw, flowEntry);
 	    pushTime = System.nanoTime() - startPush;
 	}
-	
+
 	/**
 	 * Remove this FlowEntry from a switch via FlowPusher.
 	 * @param sw Switch from which flow will be removed.
 	 */
 	public void removeFromSwitch(IOFSwitch sw) {
 	    if (statisticsReply == null) {
-		log.error("Error removing non-existent flow entry {} from sw {}", 
+		log.error("Error removing non-existent flow entry {} from sw {}",
 			  flowEntryId, sw.getId());
 		return;
 	    }
@@ -335,7 +345,7 @@
 	 */
 	@Override
 	public boolean equals(Object obj){
-	    if(obj.getClass() == this.getClass()) {
+	    if(obj != null && obj.getClass() == this.getClass()) {
 		FlowEntryWrapper entry = (FlowEntryWrapper) obj;
 		// TODO: we need to actually compare the match + actions
 		return this.flowEntryId.equals(entry.flowEntryId);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 4415120..b3d9759 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -1,6 +1,5 @@
 package net.onrc.onos.ofcontroller.forwarding;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -16,7 +15,6 @@
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.packet.Ethernet;
-import net.floodlightcontroller.packet.IPv4;
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.ofcontroller.core.IDeviceStorage;
@@ -27,11 +25,13 @@
 import net.onrc.onos.ofcontroller.devicemanager.IOnosDeviceService;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
 import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
-import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
+import net.onrc.onos.ofcontroller.proxyarp.BroadcastPacketOutNotification;
+import net.onrc.onos.ofcontroller.proxyarp.IProxyArpService;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
 import net.onrc.onos.ofcontroller.util.CallerId;
 import net.onrc.onos.ofcontroller.util.DataPath;
 import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -53,7 +53,6 @@
 
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
-import com.google.common.net.InetAddresses;
 
 public class Forwarding implements IOFMessageListener, IFloodlightModule,
 									IForwardingService {
@@ -61,8 +60,8 @@
 
 	private final int IDLE_TIMEOUT = 5; // seconds
 	private final int HARD_TIMEOUT = 0; // seconds
-
-	private final int PATH_PUSHED_TIMEOUT = 3000; // milliseconds
+	
+	private final CallerId callerId = new CallerId("Forwarding");
 	
 	private IFloodlightProviderService floodlightProvider;
 	private IFlowService flowService;
@@ -72,7 +71,6 @@
 	private IDeviceStorage deviceStorage;
 	private TopologyManager topologyService;
 	
-	//private Map<Path, Long> pendingFlows;
 	// TODO it seems there is a Guava collection that will time out entries.
 	// We should see if this will work here.
 	private Map<Path, PushedFlow> pendingFlows;
@@ -92,31 +90,18 @@
 	
 	private class PushedFlow {
 		public final long flowId;
-		private final long pushedTime;
-		public short firstHopOutPort = OFPort.OFPP_NONE.getValue();
+		public boolean installed = false;
 		
 		public PushedFlow(long flowId) {
 			this.flowId = flowId;
-			pushedTime = System.currentTimeMillis();
-		}
-		
-		public boolean isExpired() {
-			return (System.currentTimeMillis() - pushedTime) > PATH_PUSHED_TIMEOUT;
 		}
 	}
 	
 	private final class Path {
-		public final SwitchPort srcPort;
-		public final SwitchPort dstPort;
 		public final MACAddress srcMac;
 		public final MACAddress dstMac;
 		
-		public Path(SwitchPort src, SwitchPort dst, 
-				MACAddress srcMac, MACAddress dstMac) {
-			srcPort = new SwitchPort(new Dpid(src.dpid().value()), 
-					new Port(src.port().value()));
-			dstPort = new SwitchPort(new Dpid(dst.dpid().value()), 
-					new Port(dst.port().value()));
+		public Path(MACAddress srcMac, MACAddress dstMac) {
 			this.srcMac = srcMac;
 			this.dstMac = dstMac;
 		}
@@ -128,17 +113,13 @@
 			}
 			
 			Path otherPath = (Path) other;
-			return srcPort.equals(otherPath.srcPort) && 
-					dstPort.equals(otherPath.dstPort) &&
-					srcMac.equals(otherPath.srcMac) &&
+			return srcMac.equals(otherPath.srcMac) &&
 					dstMac.equals(otherPath.dstMac);
 		}
 		
 		@Override
 		public int hashCode() {
 			int hash = 17;
-			hash = 31 * hash + srcPort.hashCode();
-			hash = 31 * hash + dstPort.hashCode();
 			hash = 31 * hash + srcMac.hashCode();
 			hash = 31 * hash + dstMac.hashCode();
 			return hash;
@@ -146,8 +127,7 @@
 		
 		@Override
 		public String toString() {
-			return "(" + srcMac + " at " + srcPort + ") => (" 
-					+ dstPort + " at " + dstMac + ")";
+			return "(" + srcMac + ") => (" + dstMac + ")";
 		}
 	}
 	
@@ -175,6 +155,9 @@
 		dependencies.add(IFlowService.class);
 		dependencies.add(IFlowPusherService.class);
 		dependencies.add(IOnosDeviceService.class);
+		// We don't use the IProxyArpService directly, but reactive forwarding
+		// requires it to be loaded and answering ARP requests
+		dependencies.add(IProxyArpService.class);
 		return dependencies;
 	}
 	
@@ -187,12 +170,8 @@
 		datagrid = context.getServiceImpl(IDatagridService.class);
 		
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
-		
-		//pendingFlows = new ConcurrentHashMap<Path, Long>();
+
 		pendingFlows = new HashMap<Path, PushedFlow>();
-		//waitingPackets = Multimaps.synchronizedSetMultimap(
-				//HashMultimap.<Long, PacketToPush>create());
-		//waitingPackets = HashMultimap.create();
 		waitingPackets = LinkedListMultimap.create();
 		
 		deviceStorage = new DeviceStorageImpl();
@@ -242,7 +221,6 @@
 		
 		if (eth.isBroadcast() || eth.isMulticast()) {
 			handleBroadcast(sw, pi, eth);
-			//return Command.CONTINUE;
 		}
 		else {
 			// Unicast
@@ -256,24 +234,9 @@
 		if (log.isTraceEnabled()) {
 			log.trace("Sending broadcast packet to other ONOS instances");
 		}
-		
-		IPv4 ipv4Packet = (IPv4) eth.getPayload();
-		
-		// TODO We'll put the destination address here, because the current
-		// architecture needs an address. Addresses are only used for replies
-		// however, which don't apply to non-ARP packets. The ArpMessage class
-		// has become a bit too overloaded and should be refactored to 
-		// handle all use cases nicely.
-		 InetAddress targetAddress = 
-				InetAddresses.fromInteger(ipv4Packet.getDestinationAddress());
-		
-		// Piggy-back on the ARP mechanism to broadcast this packet out the
-		// edge. Luckily the ARP module doesn't check that the packet is
-		// actually ARP before broadcasting, so we can trick it into sending
-		// our non-ARP packets.
-		// TODO This should be refactored later to account for the new use case.
-		datagrid.sendArpRequest(ArpMessage.newRequest(targetAddress, eth.serialize(),
-				-1L, (short)-1, sw.getId(), pi.getInPort()));
+
+		 datagrid.sendPacketOutNotification(new BroadcastPacketOutNotification(
+				 eth.serialize(), sw.getId(), pi.getInPort()));
 	}
 	
 	private void handlePacketIn(IOFSwitch sw, OFPacketIn pi, Ethernet eth) {
@@ -303,7 +266,6 @@
 		long destinationDpid = HexString.toLong(switchObject.getDPID());
 		
 		// TODO SwitchPort, Dpid and Port should probably be immutable
-		// (also, are Dpid and Port are even necessary?)
 		SwitchPort srcSwitchPort = new SwitchPort(
 				new Dpid(sw.getId()), new Port(pi.getInPort())); 
 		SwitchPort dstSwitchPort = new SwitchPort(
@@ -312,26 +274,50 @@
 		MACAddress srcMacAddress = MACAddress.valueOf(eth.getSourceMACAddress());
 		MACAddress dstMacAddress = MACAddress.valueOf(eth.getDestinationMACAddress());
 		
-		
 		FlowPath flowPath, reverseFlowPath;
 		
-		Path pathspec = new Path(srcSwitchPort, dstSwitchPort, 
-				srcMacAddress, dstMacAddress);
+		Path pathspec = new Path(srcMacAddress, dstMacAddress);
 		// TODO check concurrency
 		synchronized (lock) {
 			PushedFlow existingFlow = pendingFlows.get(pathspec);
-			//Long existingFlowId = pendingFlows.get(pathspec);
-			
-			if (existingFlow != null && !existingFlow.isExpired()) {
+
+			if (existingFlow != null) {
+				// We've already installed a flow for this pair of MAC addresses
 				log.debug("Found existing flow {}", 
 						HexString.toHexString(existingFlow.flowId));
 				
 				OFPacketOut po = constructPacketOut(pi, sw);
 				
-				if (existingFlow.firstHopOutPort != OFPort.OFPP_NONE.getValue()) {
+				// Find the correct port here. We just assume the PI is from 
+				// the first hop switch, but this is definitely not always
+				// the case. We'll have to retrieve the flow from HZ every time
+				// because it could change (be rerouted) sometimes.
+				if (existingFlow.installed) {
 					// Flow has been sent to the switches so it is safe to
 					// send a packet out now
-					sendPacketOut(sw, po, existingFlow.firstHopOutPort);
+					FlowPath flow = datagrid.getFlow(new FlowId(existingFlow.flowId));
+					FlowEntry flowEntryForThisSwitch = null;
+					
+					if (flow != null) {
+						for (FlowEntry flowEntry : flow.flowEntries()) {
+							if (flowEntry.dpid().equals(new Dpid(sw.getId()))) {
+								flowEntryForThisSwitch = flowEntry;
+								break;
+							}
+						}
+					}
+					
+					if (flowEntryForThisSwitch == null) {
+						// If we don't find a flow entry for that switch, then we're
+						// in the middle of a rerouting (or something's gone wrong). 
+						// This packet will be dropped as a victim of the rerouting.
+						log.debug("Dropping packet on flow {} between {}-{}, flow path {}",
+								new Object[] {new FlowId(existingFlow.flowId),
+								srcMacAddress, dstMacAddress, flow});
+					}
+					else {
+						sendPacketOut(sw, po, flowEntryForThisSwitch.outPort().value());
+					}
 				}
 				else {
 					// Flow has not yet been sent to switches so save the
@@ -341,21 +327,16 @@
 				}
 				return;
 			}
-			
-			//log.debug("Couldn't match {} in {}", pathspec, pendingFlows);
-			
+
 			log.debug("Adding new flow between {} at {} and {} at {}",
 					new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
 			
-			
-			CallerId callerId = new CallerId("Forwarding");
-			
 			DataPath datapath = new DataPath();
 			datapath.setSrcPort(srcSwitchPort);
 			datapath.setDstPort(dstSwitchPort);
 			
 			flowPath = new FlowPath();
-			flowPath.setInstallerId(callerId);
+			flowPath.setInstallerId(new CallerId(callerId));
 	
 			flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
 			flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
@@ -375,7 +356,7 @@
 			
 			// TODO implement copy constructor for FlowPath
 			reverseFlowPath = new FlowPath();
-			reverseFlowPath.setInstallerId(callerId);
+			reverseFlowPath.setInstallerId(new CallerId(callerId));
 			reverseFlowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
 			reverseFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
 			reverseFlowPath.setIdleTimeout(IDLE_TIMEOUT);
@@ -387,9 +368,7 @@
 			reverseFlowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
 			reverseFlowPath.setDataPath(reverseDataPath);
 			reverseFlowPath.dataPath().srcPort().dpid().toString();
-			
-			// TODO what happens if no path exists? cleanup
-			
+
 			FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
 			FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
 			
@@ -397,50 +376,23 @@
 			reverseFlowPath.setFlowId(reverseFlowId);
 			
 			OFPacketOut po = constructPacketOut(pi, sw);
-			Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort, 
-					dstMacAddress, srcMacAddress);
+			Path reversePathSpec = new Path(dstMacAddress, srcMacAddress);
 			
 			// Add to waiting lists
-			//pendingFlows.put(pathspec, flowId.value());
-			//pendingFlows.put(reversePathSpec, reverseFlowId.value());
 			pendingFlows.put(pathspec, new PushedFlow(flowId.value()));
 			pendingFlows.put(reversePathSpec, new PushedFlow(reverseFlowId.value()));
 			waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
 		
 		}
 		
+		log.debug("Adding reverse {} to {} flowid {}", new Object[] {
+				dstMacAddress, srcMacAddress, reverseFlowPath.flowId()});
 		flowService.addFlow(reverseFlowPath);
+		log.debug("Adding forward {} to {} flowid {}", new Object[] {
+				srcMacAddress, dstMacAddress, flowPath.flowId()});
 		flowService.addFlow(flowPath);
 		
 	}
-	
-	/*
-	private boolean flowExists(SwitchPort srcPort, MACAddress srcMac, 
-			SwitchPort dstPort, MACAddress dstMac) {
-		for (FlowPath flow : datagridService.getAllFlows()) {
-			FlowEntryMatch match = flow.flowEntryMatch();
-			// TODO implement FlowEntryMatch.equals();
-			// This is painful to do properly without support in the FlowEntryMatch
-			boolean same = true;
-			if (!match.srcMac().equals(srcMac) ||
-				!match.dstMac().equals(dstMac)) {
-				same = false;
-			}
-			if (!flow.dataPath().srcPort().equals(srcPort) || 
-				!flow.dataPath().dstPort().equals(dstPort)) {
-				same = false;
-			}
-			
-			if (same) {
-				log.debug("found flow entry that's the same {}-{}:::{}-{}",
-						new Object[] {srcPort, srcMac, dstPort, dstMac});
-				return true;
-			}
-		}
-		
-		return false;
-	}
-	*/
 
 	private OFPacketOut constructPacketOut(OFPacketIn pi, IOFSwitch sw) {	
 		OFPacketOut po = new OFPacketOut();
@@ -467,10 +419,45 @@
 			flowInstalled(flowPath);
 		}
 	}
+	
+	@Override
+	public void flowRemoved(FlowPath removedFlowPath) {
+		if (!removedFlowPath.installerId().equals(callerId)) {
+			// Not our flow path, ignore
+			return;
+		}
+		
+		MACAddress srcMacAddress = removedFlowPath.flowEntryMatch().srcMac();
+		MACAddress dstMacAddress = removedFlowPath.flowEntryMatch().dstMac();
+		
+		Path removedPath = new Path(srcMacAddress, dstMacAddress);
+		
+		synchronized (lock) {
+			pendingFlows.remove(removedPath);
+			
+			// There *shouldn't* be any packets queued if the flow has 
+			// just been removed. 
+			List<PacketToPush> packets = 
+					waitingPackets.removeAll(removedFlowPath.flowId().value());
+			if (!packets.isEmpty()) {
+				log.warn("Removed flow {} has packets queued", 
+						removedFlowPath.flowId());
+			}
+		}
+	}
 
 	private void flowInstalled(FlowPath installedFlowPath) {
 		long flowId = installedFlowPath.flowId().value();
 		
+		if (!installedFlowPath.installerId().equals(callerId)) {
+			// Not our flow path, ignore
+			return;
+		}
+		
+		// TODO waiting packets should time out. We could request a path that
+		// can't be installed right now because of a network partition. The path
+		// may eventually be installed, but we may have received thousands of 
+		// packets in the meantime and probably don't want to send very old packets.
 		short outPort = 
 				installedFlowPath.flowEntries().get(0).outPort().value();
 		
@@ -479,19 +466,17 @@
 		
 		Collection<PacketToPush> packets;
 		synchronized (lock) {
-			log.debug("Flow {} has been installed, sending queued packets",
-					installedFlowPath.flowId());
-			
 			packets = waitingPackets.removeAll(flowId);
 			
+			log.debug("Flow {} has been installed, sending {} queued packets",
+					installedFlowPath.flowId(), packets.size());
+			
 			// remove pending flows entry
-			Path installedPath = new Path(installedFlowPath.dataPath().srcPort(),
-					installedFlowPath.dataPath().dstPort(),
-					srcMacAddress, dstMacAddress);
-			//pendingFlows.remove(pathToRemove);
+			Path installedPath = new Path(srcMacAddress, dstMacAddress);
 			PushedFlow existingFlow = pendingFlows.get(installedPath);
-			if (existingFlow != null)
-			    existingFlow.firstHopOutPort = outPort;
+			if (existingFlow != null) {
+			    existingFlow.installed = true;
+			}
 		}
 		
 		for (PacketToPush packet : packets) {
@@ -509,4 +494,5 @@
 		
 		flowPusher.add(sw, po);
 	}
+
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
index e5bd714..0e0d1da 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
@@ -22,4 +22,12 @@
 	 * been installed in the network.
 	 */
 	public void flowsInstalled(Collection<FlowPath> installedFlowPaths);
+	
+	/**
+	 * Notify the Forwarding module that a flow has expired and been 
+	 * removed from the network.
+	 * 
+	 * @param removedFlowPath The FlowPath that was removed
+	 */
+	public void flowRemoved(FlowPath removedFlowPath);
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
index 8077201..3bb2878 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
@@ -112,7 +112,7 @@
  */
 @LogMessageCategory("Network Topology")
 public class LinkDiscoveryManager
-implements IOFMessageListener, IOFSwitchListener, 
+implements IOFMessageListener, IOFSwitchListener,
 ILinkDiscoveryService, IFloodlightModule {
 	protected IFloodlightProviderService controller;
     protected final static Logger log = LoggerFactory.getLogger(LinkDiscoveryManager.class);
@@ -125,7 +125,7 @@
 
 
     // LLDP and BDDP fields
-    private static final byte[] LLDP_STANDARD_DST_MAC_STRING = 
+    private static final byte[] LLDP_STANDARD_DST_MAC_STRING =
             HexString.fromHexString("01:80:c2:00:00:0e");
     private static final long LINK_LOCAL_MASK  = 0xfffffffffff0L;
     private static final long LINK_LOCAL_VALUE = 0x0180c2000000L;
@@ -135,27 +135,27 @@
     private static final String LLDP_BSN_DST_MAC_STRING = "ff:ff:ff:ff:ff:ff";
 
 
-    // Direction TLVs are used to indicate if the LLDPs were sent 
+    // Direction TLVs are used to indicate if the LLDPs were sent
     // periodically or in response to a recieved LLDP
     private static final byte TLV_DIRECTION_TYPE = 0x73;
     private static final short TLV_DIRECTION_LENGTH = 1;  // 1 byte
     private static final byte TLV_DIRECTION_VALUE_FORWARD[] = {0x01};
     private static final byte TLV_DIRECTION_VALUE_REVERSE[] = {0x02};
-    private static final LLDPTLV forwardTLV 
+    private static final LLDPTLV forwardTLV
     = new LLDPTLV().
-    setType((byte)TLV_DIRECTION_TYPE).
-    setLength((short)TLV_DIRECTION_LENGTH).
+    setType(TLV_DIRECTION_TYPE).
+    setLength(TLV_DIRECTION_LENGTH).
     setValue(TLV_DIRECTION_VALUE_FORWARD);
 
-    private static final LLDPTLV reverseTLV 
+    private static final LLDPTLV reverseTLV
     = new LLDPTLV().
-    setType((byte)TLV_DIRECTION_TYPE).
-    setLength((short)TLV_DIRECTION_LENGTH).
+    setType(TLV_DIRECTION_TYPE).
+    setLength(TLV_DIRECTION_LENGTH).
     setValue(TLV_DIRECTION_VALUE_REVERSE);
 
     // Link discovery task details.
     protected SingletonTask discoveryTask;
-    protected final int DISCOVERY_TASK_INTERVAL = 1; 
+    protected final int DISCOVERY_TASK_INTERVAL = 1;
     protected final int LINK_TIMEOUT = 35; // original 35 secs, aggressive 5 secs
     protected final int LLDP_TO_ALL_INTERVAL = 15 ; //original 15 seconds, aggressive 2 secs.
     protected long lldpClock = 0;
@@ -206,7 +206,7 @@
     /* topology aware components are called in the order they were added to the
      * the array */
     protected ArrayList<ILinkDiscoveryListener> linkDiscoveryAware;
-    
+
     protected class LinkUpdate extends LDUpdate {
 
 		public LinkUpdate(LDUpdate old) {
@@ -263,7 +263,7 @@
      */
     protected Map<NodePortTuple, Long> broadcastDomainPortTimeMap;
 
-    /** 
+    /**
      * Get the LLDP sending period in seconds.
      * @return LLDP sending period in seconds.
      */
@@ -283,6 +283,7 @@
         return portLinks;
     }
 
+    @Override
     public Set<NodePortTuple> getSuppressLLDPsInfo() {
         return suppressLinkDiscovery;
     }
@@ -291,6 +292,7 @@
      * Add a switch port to the suppressed LLDP list.
      * Remove any known links on the switch port.
      */
+    @Override
     public void AddToSuppressLLDPs(long sw, short port)
     {
         NodePortTuple npt = new NodePortTuple(sw, port);
@@ -302,7 +304,8 @@
      * Remove a switch port from the suppressed LLDP list.
      * Discover links on that switchport.
      */
-    public void RemoveFromSuppressLLDPs(long sw, short port) 
+    @Override
+    public void RemoveFromSuppressLLDPs(long sw, short port)
     {
         NodePortTuple npt = new NodePortTuple(sw, port);
         this.suppressLinkDiscovery.remove(npt);
@@ -317,6 +320,7 @@
         return false;
     }
 
+    @Override
     public ILinkDiscovery.LinkType getLinkType(Link lt, LinkInfo info) {
         if (info.getUnicastValidTime() != null) {
             return ILinkDiscovery.LinkType.DIRECT_LINK;
@@ -326,7 +330,7 @@
         return ILinkDiscovery.LinkType.INVALID_LINK;
     }
 
-    
+
     private boolean isLinkDiscoverySuppressed(long sw, short portNumber) {
         return this.suppressLinkDiscovery.contains(new NodePortTuple(sw, portNumber));
     }
@@ -437,6 +441,7 @@
         }
     }
 
+    @Override
     public Set<Short> getQuarantinedPorts(long sw) {
         Set<Short> qPorts = new HashSet<Short>();
 
@@ -468,12 +473,12 @@
         else operation = UpdateOperation.PORT_DOWN;
 
         LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
-        
-        
+
+
         controller.publishUpdate(update);
     }
 
-    /** 
+    /**
      * Send LLDP on known ports
      */
     protected void discoverOnKnownLinkPorts() {
@@ -500,7 +505,7 @@
      */
     protected IOFSwitch addRemoteSwitch(long sw, short port) {
     	IOnosRemoteSwitch remotesw = null;
-    	
+
     	// add a switch if we have not seen it before
     	remotesw = remoteSwitches.get(sw);
 
@@ -510,26 +515,26 @@
         	remoteSwitches.put(remotesw.getId(), remotesw);
         	log.debug("addRemoteSwitch(): added fake remote sw {}", remotesw);
         }
-        
+
         // add the port if we have not seen it before
         if (remotesw.getPort(port) == null) {
         	OFPhysicalPort remoteport = new OFPhysicalPort();
         	remoteport.setPortNumber(port);
         	remoteport.setName("fake_" + port);
-        	remoteport.setConfig(0); 
+        	remoteport.setConfig(0);
         	remoteport.setState(0);
         	remotesw.setPort(remoteport);
         	log.debug("addRemoteSwitch(): added fake remote port {} to sw {}", remoteport, remotesw.getId());
         }
-        
+
         return remotesw;
     }
-    
+
     /**
      * Send link discovery message out of a given switch port.
      * The discovery message may be a standard LLDP or a modified
-     * LLDP, where the dst mac address is set to :ff.  
-     * 
+     * LLDP, where the dst mac address is set to :ff.
+     *
      * TODO: The modified LLDP will updated in the future and may
      * use a different eth-type.
      * @param sw
@@ -565,7 +570,7 @@
 
         if (isLinkDiscoverySuppressed(sw, port)) {
             /* Dont send LLDPs out of this port as suppressLLDPs set
-             * 
+             *
              */
             return;
         }
@@ -664,8 +669,7 @@
             iofSwitch.write(po, null);
             iofSwitch.flush();
         } catch (IOException e) {
-            log.error("Failure sending LLDP out port {} on switch {}",
-                      new Object[]{ port, iofSwitch.getStringId() }, e);
+            log.error("Failure sending LLDP out port "+port+" on switch "+iofSwitch.getStringId(), e);
         }
 
     }
@@ -675,7 +679,7 @@
      */
     protected void discoverOnAllPorts() {
         if (log.isTraceEnabled()) {
-            log.trace("Sending LLDP packets out of all the enabled ports on switch {}");
+            log.trace("Sending LLDP packets out of all the enabled ports on switch");
         }
         Set<Long> switches = floodlightProvider.getSwitches().keySet();
         // Send standard LLDPs
@@ -815,6 +819,16 @@
                 }
                 return Command.STOP;
             }
+            else if(sw <= remoteSwitch.getId()){
+                if (log.isTraceEnabled()) {
+                    log.trace("Getting BBDP from a different controller. myId {}: remoteId {}", myId, otherId);
+                    log.trace("and my controller id is smaller than the other, so quelching it. myPort {}: rPort {}", pi.getInPort(), remotePort);
+                  }
+                  //XXX ONOS: Fix the BDDP broadcast issue
+                  //return Command.CONTINUE;
+                  return Command.STOP;
+            }
+            /*
             else if (myId < otherId)  {
                 if (log.isTraceEnabled()) {
                     log.trace("Getting BDDP packets from a different controller" +
@@ -824,6 +838,7 @@
                 //return Command.CONTINUE;
                 return Command.STOP;
             }
+            */
         }
 
 
@@ -882,9 +897,9 @@
 
         addOrUpdateLink(lt, newLinkInfo);
 
-        // Check if reverse link exists. 
-        // If it doesn't exist and if the forward link was seen 
-        // first seen within a small interval, send probe on the 
+        // Check if reverse link exists.
+        // If it doesn't exist and if the forward link was seen
+        // first seen within a small interval, send probe on the
         // reverse link.
 
         newLinkInfo = links.get(lt);
@@ -928,8 +943,8 @@
 
     protected Command handlePacketIn(long sw, OFPacketIn pi,
                                      FloodlightContext cntx) {
-        Ethernet eth = 
-                IFloodlightProviderService.bcStore.get(cntx, 
+        Ethernet eth =
+                IFloodlightProviderService.bcStore.get(cntx,
                                                        IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
 
         if(eth.getEtherType() == Ethernet.TYPE_BSN) {
@@ -1000,8 +1015,8 @@
                 newInfo.setFirstSeenTime(oldInfo.getFirstSeenTime());
 
             if (log.isTraceEnabled()) {
-                log.trace("addOrUpdateLink: {} {}", 
-                          lt, 
+                log.trace("addOrUpdateLink: {} {}",
+                          lt,
                           (newInfo.getMulticastValidTime()!=null) ? "multicast" : "unicast");
             }
 
@@ -1034,7 +1049,7 @@
                 // Add to portNOFLinks if the unicast valid time is null
                 if (newInfo.getUnicastValidTime() == null)
                     addLinkToBroadcastDomain(lt);
-                
+
                 // ONOS: Distinguish added event separately from updated event
                 updateOperation = UpdateOperation.LINK_ADDED;
                 linkChanged = true;
@@ -1120,6 +1135,7 @@
         return linkChanged;
     }
 
+    @Override
     public Map<Long, Set<Link>> getSwitchLinks() {
         return this.switchLinks;
     }
@@ -1199,7 +1215,7 @@
 
         // ONOS: If we do not control this switch, then we should not process its port status messages
         if (!registryService.hasControl(iofSwitch.getId())) return Command.CONTINUE;
-        
+
         if (log.isTraceEnabled()) {
             log.trace("handlePortStatus: Switch {} port #{} reason {}; " +
                     "config is {} state is {}",
@@ -1226,7 +1242,7 @@
                 LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, UpdateOperation.PORT_DOWN));
                 controller.publishUpdate(update);
                 linkDeleted = true;
-                } 
+                }
             else if (ps.getReason() ==
                     (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
                 // If ps is a port modification and the port state has changed
@@ -1238,7 +1254,7 @@
                         assert(linkInfo != null);
                         Integer updatedSrcPortState = null;
                         Integer updatedDstPortState = null;
-                        if (lt.getSrc() == npt.getNodeId() && 
+                        if (lt.getSrc() == npt.getNodeId() &&
                                 lt.getSrcPort() == npt.getPortId() &&
                                 (linkInfo.getSrcPortState() !=
                                 ps.getDesc().getState())) {
@@ -1265,7 +1281,7 @@
                                                      getLinkType(lt, linkInfo),
                                                      operation));
                             controller.publishUpdate(update);
-                            
+
                             linkInfoChanged = true;
                         }
                     }
@@ -1379,9 +1395,9 @@
             lock.writeLock().unlock();
         }
     }
-    
+
     /**
-     * We don't react the port changed notifications here. we listen for 
+     * We don't react the port changed notifications here. we listen for
      * OFPortStatus messages directly. Might consider using this notifier
      * instead
      */
@@ -1390,7 +1406,7 @@
         // no-op
     }
 
-    /** 
+    /**
      * Delete links incident on a given switch port.
      * @param npt
      * @param reason
@@ -1410,7 +1426,7 @@
         }
     }
 
-    /** 
+    /**
      * Iterates through the list of links and deletes if the
      * last discovery message reception time exceeds timeout values.
      */
@@ -1431,7 +1447,7 @@
 
                 // Timeout the unicast and multicast LLDP valid times
                 // independently.
-                if ((info.getUnicastValidTime() != null) && 
+                if ((info.getUnicastValidTime() != null) &&
                         (info.getUnicastValidTime() + (this.LINK_TIMEOUT * 1000) < curTime)){
                     info.setUnicastValidTime(null);
 
@@ -1441,7 +1457,7 @@
                     // the link would be deleted, which would trigger updateClusters().
                     linkChanged = true;
                 }
-                if ((info.getMulticastValidTime()!= null) && 
+                if ((info.getMulticastValidTime()!= null) &&
                         (info.getMulticastValidTime()+ (this.LINK_TIMEOUT * 1000) < curTime)) {
                     info.setMulticastValidTime(null);
                     // if uTime is not null, then link will remain as openflow
@@ -1452,7 +1468,7 @@
                 }
                 // Add to the erase list only if the unicast
                 // time is null.
-                if (info.getUnicastValidTime() == null && 
+                if (info.getUnicastValidTime() == null &&
                         info.getMulticastValidTime() == null){
                     eraseList.add(entry.getKey());
                 } else if (linkChanged) {
@@ -1511,11 +1527,11 @@
         srcNpt = new NodePortTuple(lt.getSrc(), lt.getSrcPort());
         dstNpt = new NodePortTuple(lt.getDst(), lt.getDstPort());
 
-        if (!portBroadcastDomainLinks.containsKey(lt.getSrc()))
+        if (!portBroadcastDomainLinks.containsKey(srcNpt))
             portBroadcastDomainLinks.put(srcNpt, new HashSet<Link>());
         portBroadcastDomainLinks.get(srcNpt).add(lt);
 
-        if (!portBroadcastDomainLinks.containsKey(lt.getDst()))
+        if (!portBroadcastDomainLinks.containsKey(dstNpt))
             portBroadcastDomainLinks.put(dstNpt, new HashSet<Link>());
         portBroadcastDomainLinks.get(dstNpt).add(lt);
     }
@@ -1576,7 +1592,7 @@
 
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
                 new ArrayList<Class<? extends IFloodlightService>>();
         l.add(ILinkDiscoveryService.class);
         //l.add(ITopologyService.class);
@@ -1587,7 +1603,7 @@
     public Map<Class<? extends IFloodlightService>, IFloodlightService>
     getServiceImpls() {
         Map<Class<? extends IFloodlightService>,
-        IFloodlightService> m = 
+        IFloodlightService> m =
         new HashMap<Class<? extends IFloodlightService>,
         IFloodlightService>();
         // We are the class that implements the service
@@ -1597,7 +1613,7 @@
 
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
                 new ArrayList<Class<? extends IFloodlightService>>();
         l.add(IFloodlightProviderService.class);
         l.add(IThreadPoolService.class);
@@ -1680,7 +1696,7 @@
                     log.error("Exception in LLDP send timer.", e);
                 } finally {
                     if (!shuttingDown) {
-                    	// Always reschedule link discovery if we're not 
+                    	// Always reschedule link discovery if we're not
                     	// shutting down (no chance of SLAVE role now)
                         log.trace("Rescheduling discovery task");
                         discoveryTask.reschedule(DISCOVERY_TASK_INTERVAL,
@@ -1730,7 +1746,7 @@
         if ((sw.getChannel() != null) &&
                 (SocketAddress.class.isInstance(
                                                 sw.getChannel().getRemoteAddress()))) {
-            evTopoSwitch.ipv4Addr = 
+            evTopoSwitch.ipv4Addr =
                     IPv4.toIPv4Address(((InetSocketAddress)(sw.getChannel().
                             getRemoteAddress())).getAddress().getAddress());
             evTopoSwitch.l4Port   =
@@ -1788,10 +1804,12 @@
         evTopoCluster = evHistTopologyCluster.put(evTopoCluster, action);
     }
 
+    @Override
     public boolean isAutoPortFastFeature() {
         return autoPortFastFeature;
     }
 
+    @Override
     public void setAutoPortFastFeature(boolean autoPortFastFeature) {
         this.autoPortFastFeature = autoPortFastFeature;
     }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpMessage.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpMessage.java
deleted file mode 100644
index 44b9ea0..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpMessage.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package net.onrc.onos.ofcontroller.proxyarp;
-
-import java.io.Serializable;
-import java.net.InetAddress;
-import net.floodlightcontroller.util.MACAddress;
-
-// TODO This is getting very messy!!! Needs refactoring
-public class ArpMessage implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	private final Type type;
-	private final InetAddress forAddress;
-	private final byte[] packetData;
-	
-	// ARP reply message needs MAC info
-	private final MACAddress mac;
-	// Only send the ARP request message to the device attachment needs the 
-	// attachment switch and port. 
-	private final long outSwitch; 
-	private final short outPort;
-	
-	private final long inSwitch;
-	private final short inPort;
-
-	public enum Type {
-		REQUEST,
-		REPLY
-	}
-	
-	private ArpMessage(Type type, InetAddress address, byte[] eth, 
-			long outSwitch, short outPort, long inSwitch, short inPort) {
-		this.type = type;
-		this.forAddress = address;
-		this.packetData = eth;
-		this.mac = null;
-		this.outSwitch = -1;
-		this.outPort = -1;
-		this.inSwitch = inSwitch;
-		this.inPort = inPort;
-	}
-	
-	private ArpMessage(Type type, InetAddress address) {
-		this.type = type;
-		this.forAddress = address;
-		this.packetData = null;
-		this.mac = null;
-		this.outSwitch = -1;
-		this.outPort = -1;
-		
-		this.inSwitch = -1;
-		this.inPort = -1;
-	}
-	// the ARP reply message with MAC
-	private ArpMessage(Type type, InetAddress address, MACAddress mac) {
-		this.type = type;
-		this.forAddress = address;
-		this.packetData = null;
-		this.mac = mac;
-		this.outSwitch = -1;
-		this.outPort = -1;
-		
-		this.inSwitch = -1;
-		this.inPort = -1;
-	}
-	
-	// construct ARP request message with attachment switch and port
-	private ArpMessage(Type type, InetAddress address, byte[] arpRequest,
-			long outSwitch, short outPort) {
-		this.type = type;
-		this.forAddress = address;
-		this.packetData = arpRequest; 	
-		this.mac = null;
-		this.outSwitch = outSwitch; 
-		this.outPort = outPort;	
-		
-		this.inSwitch = -1;
-		this.inPort = -1;
-	}
-
-	// TODO Awful quick fix - caller has to supply dummy outSwitch and outPort
-	public static ArpMessage newRequest(InetAddress forAddress, byte[] arpRequest,
-			long outSwitch, short outPort, long inSwitch, short inPort) {
-		return new ArpMessage(Type.REQUEST, forAddress, arpRequest, 
-				outSwitch, outPort, inSwitch, inPort);
-	}
-	
-	public static ArpMessage newReply(InetAddress forAddress) {
-		return new ArpMessage(Type.REPLY, forAddress);
-	}
-	
-	//ARP reply message with MAC
-	public static ArpMessage newReply(InetAddress forAddress, MACAddress mac) {
-		return new ArpMessage(Type.REPLY, forAddress, mac);
-	}
-	
-	//ARP request message with attachment switch and port
-	public static ArpMessage newRequest(InetAddress forAddress, 
-			byte[] arpRequest, long outSwitch, short outPort ) {
-		return new ArpMessage(Type.REQUEST, forAddress, arpRequest, outSwitch, 
-				outPort);
-	}
-
-	public Type getType() {
-		return type;
-	}
-	
-	public InetAddress getAddress() {
-		return forAddress;
-	}
-	
-	public byte[] getPacket() {
-		return packetData;
-	}
-	
-	public MACAddress getMAC() {
-		return mac;
-	}
-
-	public long getOutSwitch() {
-		return outSwitch;
-	}
-
-	public short getOutPort() {
-		return outPort;
-	}
-	
-	public long getInSwitch() {
-		return inSwitch;
-	}
-
-	public short getInPort() {
-		return inPort;
-	}
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpReplyNotification.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpReplyNotification.java
new file mode 100644
index 0000000..a8afc55
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpReplyNotification.java
@@ -0,0 +1,28 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+import net.floodlightcontroller.util.MACAddress;
+
+public class ArpReplyNotification implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	private InetAddress targetAddress;
+	private MACAddress targetMacAddress;
+	
+	public ArpReplyNotification(InetAddress targetAddress, MACAddress targetMacAddress) {
+		this.targetAddress = targetAddress;
+		this.targetMacAddress = targetMacAddress;
+	}
+
+	public InetAddress getTargetAddress() {
+		return targetAddress;
+	}
+
+	public MACAddress getTargetMacAddress() {
+		return targetMacAddress;
+	}
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/BroadcastPacketOutNotification.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/BroadcastPacketOutNotification.java
new file mode 100644
index 0000000..73d2163
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/BroadcastPacketOutNotification.java
@@ -0,0 +1,34 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+/**
+ * Notification to all ONOS instances to broadcast this packet out the edge of
+ * the network. The edge is defined as any port that doesn't have a link to
+ * another switch. The one exception is the port that the packet was received
+ * on.
+ *
+ */
+public class BroadcastPacketOutNotification extends
+		PacketOutNotification {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private final long inSwitch;
+	private final short inPort;
+
+	public BroadcastPacketOutNotification(byte[] packet, long inSwitch, 
+			short inPort) {
+		super(packet);
+		
+		this.inSwitch = inSwitch;
+		this.inPort = inPort;
+	}
+
+	public long getInSwitch() {
+		return inSwitch;
+	}
+
+	public short getInPort() {
+		return inPort;
+	}
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
deleted file mode 100644
index 4ec32ec..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package net.onrc.onos.ofcontroller.proxyarp;
-
-public interface IArpEventHandler {
-
-	/**
-	 * Notify the ARP event handler that an ARP request has been received.
-	 * @param id The string ID of the ARP request
-	 * @param arpRequest The ARP request packet
-	 */
-	public void arpRequestNotification(ArpMessage arpMessage);
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java
new file mode 100644
index 0000000..75f1d5d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpReplyEventHandler.java
@@ -0,0 +1,5 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+public interface IArpReplyEventHandler {
+	public void arpReplyEvent(ArpReplyNotification arpReply);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java
new file mode 100644
index 0000000..86b3728
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IPacketOutEventHandler.java
@@ -0,0 +1,18 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+/**
+ * Classes may implement this interface if they wish to subscribe to 
+ * packet out notifications from the datagrid service. Packet out notifications
+ * are used to direct other ONOS instances to send packets out particular
+ * ports under their control.
+ *
+ */
+public interface IPacketOutEventHandler {
+
+	/**
+	 * Notify the packet out event handler that an packet out notification has
+	 * been received.
+	 * @param packetOutNotification An object describing the notification
+	 */
+	public void packetOutNotification(PacketOutNotification packetOutNotification);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/PacketOutNotification.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/PacketOutNotification.java
new file mode 100644
index 0000000..3d37d25
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/PacketOutNotification.java
@@ -0,0 +1,21 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+import java.io.Serializable;
+
+/**
+ * A PacketOutNotification contains data sent between ONOS instances that
+ * directs other instances to send a packet out a set of ports.
+ * This is an abstract base class that will be subclassed by specific
+ * types of notifications.
+ *
+ */
+public abstract class PacketOutNotification implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	protected final byte[] packet;
+
+	public PacketOutNotification(byte[] packet) {
+		this.packet = packet;
+	}
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index 32b2e9c..ac99678 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -37,6 +37,7 @@
 import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
 import net.onrc.onos.ofcontroller.core.internal.DeviceStorageImpl;
 import net.onrc.onos.ofcontroller.core.internal.TopoSwitchServiceImpl;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
 import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.Port;
 import net.onrc.onos.ofcontroller.util.SwitchPort;
@@ -58,7 +59,8 @@
 import com.google.common.net.InetAddresses;
 
 public class ProxyArpManager implements IProxyArpService, IOFMessageListener,
-										IArpEventHandler, IFloodlightModule {
+										IPacketOutEventHandler, IArpReplyEventHandler, 
+										IFloodlightModule {
 	private final static Logger log = LoggerFactory.getLogger(ProxyArpManager.class);
 	
 	private final long ARP_TIMER_PERIOD = 100; //ms  
@@ -70,6 +72,7 @@
 	private IDatagridService datagrid;
 	private IConfigInfoService configService;
 	private IRestApiService restApi;
+	private IFlowPusherService flowPusher;
 	
 	private IDeviceStorage deviceStorage;
 	private volatile ITopoSwitchService topoSwitchService;
@@ -153,6 +156,7 @@
 		dependencies.add(IRestApiService.class);
 		dependencies.add(IDatagridService.class);
 		dependencies.add(IConfigInfoService.class);
+		dependencies.add(IFlowPusherService.class);
 		return dependencies;
 	}
 	
@@ -164,6 +168,7 @@
 		this.datagrid = context.getServiceImpl(IDatagridService.class);
 		this.configService = context.getServiceImpl(IConfigInfoService.class);
 		this.restApi = context.getServiceImpl(IRestApiService.class);
+		this.flowPusher = context.getServiceImpl(IFlowPusherService.class);
 		
 		//arpCache = new ArpCache();
 
@@ -181,7 +186,7 @@
 		restApi.addRestletRoutable(new ArpWebRoutable());
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 		
-		datagrid.registerArpEventHandler(this);
+		datagrid.registerPacketOutEventHandler(this);
 		
 		deviceStorage = new DeviceStorageImpl();
 		deviceStorage.init("");
@@ -226,7 +231,9 @@
 					
 					if (targetDevice != null) {
 						deviceStorage.removeDevice(targetDevice);
-						log.debug("RemoveDevice: {} due to no have not recieve the ARP reply", targetDevice.toString());
+						if (log.isDebugEnabled()) {
+							log.debug("RemoveDevice: {} due to no have not recieve the ARP reply", targetDevice);
+						}
 					}
 					
 					it.remove();
@@ -289,7 +296,7 @@
 			}
 			else if (arp.getOpCode() == ARP.OP_REPLY) {
 				handleArpReply(sw, pi, arp);
-				sendToOtherNodesReply(eth, pi);
+				sendArpReplyNotification(eth, pi);
 			}
 			
 			// Stop ARP packets here
@@ -343,7 +350,9 @@
 			}
 			
 			// We don't know the device so broadcast the request out
-			sendToOtherNodes(eth, sw.getId(), pi);
+			datagrid.sendPacketOutNotification(
+					new BroadcastPacketOutNotification(eth.serialize(), 
+							sw.getId(), pi.getInPort()));
 		}
 		else {
 			// Even if the device exists in our database, we do not reply to
@@ -351,16 +360,15 @@
 			MACAddress macAddress = MACAddress.valueOf(targetDevice.getMACAddress());
 
 			if (log.isTraceEnabled()) {
-				log.trace("The target Device Record in DB is: {} => {} from ARP request host at {}/{}", 
+				log.trace("The target Device Record in DB is: {} => {} from ARP request host at {}/{}",
 						new Object [] {
 						inetAddressToString(arp.getTargetProtocolAddress()),
-						macAddress.toString(),
+						macAddress,
 						HexString.toHexString(sw.getId()), pi.getInPort()});
 			}
 
 			// sendArpReply(arp, sw.getId(), pi.getInPort(), macAddress);
 
-			log.trace("Checking the device info from DB is still valid or not");
 			Iterable<IPortObject> outPorts = targetDevice.getAttachedPorts();	
 
 			if (!outPorts.iterator().hasNext()){
@@ -369,19 +377,26 @@
 							" - broadcasting", macAddress);
 				}
 				
-				sendToOtherNodes(eth, sw.getId(), pi);
+				datagrid.sendPacketOutNotification(
+						new BroadcastPacketOutNotification(eth.serialize(), 
+								sw.getId(), pi.getInPort()));
 			} 
 			else {
 				for (IPortObject portObject : outPorts) {
-					long outSwitch = 0;
-					short outPort = 0;
+					//long outSwitch = 0;
+					//short outPort = 0;
 
+					/*
 					if (!portObject.getLinkedPorts().iterator().hasNext()) {
 						outPort = portObject.getNumber();					
+					}*/
+					if (portObject.getLinkedPorts().iterator().hasNext()) {
+						continue;
 					}
 
+					short outPort = portObject.getNumber();
 					ISwitchObject outSwitchObject = portObject.getSwitch();
-					outSwitch = HexString.toLong(outSwitchObject.getDPID());
+					long outSwitch = HexString.toLong(outSwitchObject.getDPID());
 
 					if (log.isTraceEnabled()) {
 						log.trace("Probing device {} on port {}/{}", 
@@ -389,7 +404,9 @@
 								HexString.toHexString(outSwitch), outPort});
 					}
 					
-					sendToOtherNodes(eth, pi, outSwitch, outPort);
+					datagrid.sendPacketOutNotification(
+							new SinglePacketOutNotification(eth.serialize(), 
+									outSwitch, outPort));
 				}
 			}
 		}
@@ -515,50 +532,7 @@
 		}
 	}
 	
-	private void sendToOtherNodes(Ethernet eth, long inSwitchId, OFPacketIn pi) {
-		ARP arp = (ARP) eth.getPayload();
-		
-		if (log.isTraceEnabled()) {
-			log.trace("Sending ARP request for {} to other ONOS instances",
-					inetAddressToString(arp.getTargetProtocolAddress()));
-		}
-		
-		InetAddress targetAddress;
-		try {
-			targetAddress = InetAddress.getByAddress(arp.getTargetProtocolAddress());
-		} catch (UnknownHostException e) {
-			log.error("Unknown host", e);
-			return;
-		}
-		
-		datagrid.sendArpRequest(ArpMessage.newRequest(targetAddress, eth.serialize(),
-				-1L, (short)-1, inSwitchId, pi.getInPort()));
-	}
-	
-	//hazelcast to other ONOS instances to send the ARP packet out on outPort of outSwitch
-	private void sendToOtherNodes(Ethernet eth, OFPacketIn pi, long outSwitch, short outPort) {
-		ARP arp = (ARP) eth.getPayload();
-		
-		if (log.isTraceEnabled()) {
-			log.trace("Sending ARP request for {} to other ONOS instances with outSwitch {} ",
-					inetAddressToString(arp.getTargetProtocolAddress()), String.valueOf(outSwitch));
-		}
-		
-		InetAddress targetAddress;
-		try {
-			targetAddress = InetAddress.getByAddress(arp.getTargetProtocolAddress());
-		} catch (UnknownHostException e) {
-			log.error("Unknown host", e);
-			return;
-		}
-		
-		datagrid.sendArpRequest(ArpMessage.newRequest(targetAddress, eth.serialize(), outSwitch, outPort)); 
-		//datagrid.sendArpRequest(ArpMessage.newRequest(targetAddress, eth.serialize()));
-		
-		
-	}
-	
-	private void sendToOtherNodesReply(Ethernet eth, OFPacketIn pi) {
+	private void sendArpReplyNotification(Ethernet eth, OFPacketIn pi) {
 		ARP arp = (ARP) eth.getPayload();
 		
 		if (log.isTraceEnabled()) {
@@ -575,12 +549,14 @@
 			log.error("Unknown host", e);
 			return;
 		}
-		
-		datagrid.sendArpRequest(ArpMessage.newReply(targetAddress, mac));
-		//datagrid.sendArpReply(ArpMessage.newRequest(targetAddress, eth.serialize()));
-	
+
+		datagrid.sendArpReplyNotification(new ArpReplyNotification(targetAddress, mac));
 	}
 	
+	// This remains from the older single-instance ARP code. It used Floodlight
+	// APIs to find the edge of the network, but only worked on a single instance.
+	// We now do this using ONOS network graph APIs.
+	@Deprecated
 	private void broadcastArpRequestOutEdge(byte[] arpRequest, long inSwitch, short inPort) {
 		for (IOFSwitch sw : floodlightProvider.getSwitches().values()){
 			Collection<Short> enabledPorts = sw.getEnabledPortNumbers();
@@ -671,12 +647,7 @@
 			po.setLengthU(OFPacketOut.MINIMUM_LENGTH + actionsLength 
 					+ arpRequest.length);
 			
-			try {
-				sw.write(po, null);
-				sw.flush();
-			} catch (IOException e) {
-				log.error("Failure writing packet out to switch", e);
-			}
+			flowPusher.add(sw, po);
 		}
 		
 		if (log.isTraceEnabled()) {
@@ -710,12 +681,7 @@
 			return;
 		}
 		
-		try {
-			sw.write(po, null);
-			sw.flush();
-		} catch (IOException e) {
-			log.error("Failure writing packet out to switch", e);
-		}
+		flowPusher.add(sw, po);
 	}
 	
 	private void sendArpReply(ARP arpRequest, long dpid, short port, MACAddress targetMac) {
@@ -738,7 +704,6 @@
 			.setTargetProtocolAddress(arpRequest.getSenderProtocolAddress());
 		
 
-		
 		Ethernet eth = new Ethernet();
 		eth.setDestinationMACAddress(arpRequest.getSenderHardwareAddress())
 			.setSourceMACAddress(targetMac.toBytes())
@@ -773,12 +738,7 @@
 			return;
 		}
 		
-		try {
-			sw.write(msgList, null);
-			sw.flush();
-		} catch (IOException e) {
-			log.error("Failure writing packet out to switch", e);
-		}
+		flowPusher.add(sw, po);
 	}
 	
 	private String inetAddressToString(byte[] bytes) {
@@ -818,9 +778,6 @@
 	}
 
 	/*
-	 * IArpEventHandler methods
-	 */
-	
 	@Override
 	public void arpRequestNotification(ArpMessage arpMessage) {
 		log.debug("Received ARP notification from other instances");
@@ -842,6 +799,7 @@
 			break;
 		}
 	}
+	*/
 	
 	private void sendArpReplyToWaitingRequesters(InetAddress address, MACAddress mac) {
 		log.debug("Sending ARP reply for {} to requesters", 
@@ -874,4 +832,33 @@
 			request.dispatchReply(address, mac);
 		}
 	}
+
+	@Override
+	public void arpReplyEvent(ArpReplyNotification arpReply) {
+		log.debug("Received ARP reply notification for {}",
+				arpReply.getTargetAddress());
+		sendArpReplyToWaitingRequesters(arpReply.getTargetAddress(), 
+				arpReply.getTargetMacAddress());
+	}
+
+	@Override
+	public void packetOutNotification(
+			PacketOutNotification packetOutNotification) {
+		
+		if (packetOutNotification instanceof SinglePacketOutNotification) {
+			SinglePacketOutNotification notification = 
+					(SinglePacketOutNotification) packetOutNotification;
+			sendArpRequestOutPort(notification.packet, notification.getOutSwitch(), 
+					notification.getOutPort());
+		}
+		else if (packetOutNotification instanceof BroadcastPacketOutNotification) {
+			BroadcastPacketOutNotification notification = 
+					(BroadcastPacketOutNotification) packetOutNotification;
+			broadcastArpRequestOutMyEdge(notification.packet, 
+					notification.getInSwitch(), notification.getInPort());
+		}
+		else {
+			log.warn("Unknown packet out notification received");
+		}
+	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/SinglePacketOutNotification.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/SinglePacketOutNotification.java
new file mode 100644
index 0000000..1919d87
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/SinglePacketOutNotification.java
@@ -0,0 +1,30 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+/**
+ * Notification to another ONOS instance to send a packet out a single port.
+ *
+ */
+public class SinglePacketOutNotification extends PacketOutNotification {
+
+	private static final long serialVersionUID = 1L;
+	
+	private final long outSwitch;
+	private final short outPort;
+	
+	public SinglePacketOutNotification(byte[] packet, long outSwitch, 
+			short outPort) {
+		super(packet);
+		
+		this.outSwitch = outSwitch;
+		this.outPort = outPort;
+	}
+
+	public long getOutSwitch() {
+		return outSwitch;
+	}
+
+	public short getOutPort() {
+		return outPort;
+	}
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java b/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java
index be207c6..d447580 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java
@@ -82,7 +82,8 @@
     /**
      * Shutdown the Topology Manager operation.
      */
-    public void finalize() {
+    @Override
+    protected void finalize() {
 	close();
     }
 
@@ -100,7 +101,7 @@
      */
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
-        Collection<Class<? extends IFloodlightService>> l = 
+        Collection<Class<? extends IFloodlightService>> l =
             new ArrayList<Class<? extends IFloodlightService>>();
         l.add(ITopologyNetService.class);
         return l;
@@ -112,10 +113,10 @@
      * @return the collection of implemented services.
      */
     @Override
-    public Map<Class<? extends IFloodlightService>, IFloodlightService> 
+    public Map<Class<? extends IFloodlightService>, IFloodlightService>
 			       getServiceImpls() {
         Map<Class<? extends IFloodlightService>,
-	    IFloodlightService> m = 
+	    IFloodlightService> m =
             new HashMap<Class<? extends IFloodlightService>,
 	    IFloodlightService>();
         m.put(ITopologyNetService.class, this);
@@ -128,7 +129,7 @@
      * @return the collection of modules this module depends on.
      */
     @Override
-    public Collection<Class<? extends IFloodlightService>> 
+    public Collection<Class<? extends IFloodlightService>>
                                                     getModuleDependencies() {
 	Collection<Class<? extends IFloodlightService>> l =
 	    new ArrayList<Class<? extends IFloodlightService>>();
@@ -191,6 +192,7 @@
      *
      * @return the allocated topology handler.
      */
+    @Override
     public Topology newDatabaseTopology() {
 	Topology topology = new Topology();
 	topology.readFromDatabase(dbHandler);
@@ -207,8 +209,9 @@
      *
      * @param topology the topology to release.
      */
+    @Override
     public void dropTopology(Topology topology) {
-	topology = null;
+    // nothing to do
     }
 
     /**
@@ -303,6 +306,7 @@
      * @return the data path with the computed shortest path if
      * found, otherwise null.
      */
+    @Override
     public DataPath getTopologyShortestPath(Topology topology,
 					    SwitchPort src, SwitchPort dest) {
 	return ShortestPath.getTopologyShortestPath(topology, src, dest);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/web/RouteResource.java b/src/main/java/net/onrc/onos/ofcontroller/topology/web/RouteResource.java
index d8997dc..540c47d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/web/RouteResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/web/RouteResource.java
@@ -40,7 +40,7 @@
         String dstDpidStr = (String) getRequestAttributes().get("dst-dpid");
         String dstPortStr = (String) getRequestAttributes().get("dst-port");
 
-        log.debug( srcDpidStr + "--" + srcPortStr + "--" + dstDpidStr + "--" + dstPortStr);
+        log.debug( "{}--{}--{}--{}", srcDpidStr, srcPortStr, dstDpidStr, dstPortStr);
 
 	Dpid srcDpid = new Dpid(srcDpidStr);
 	Port srcPort = new Port(Short.parseShort(srcPortStr));
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/CallerId.java b/src/main/java/net/onrc/onos/ofcontroller/util/CallerId.java
index 0607533..a0217d4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/CallerId.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/CallerId.java
@@ -12,6 +12,15 @@
      * Default constructor.
      */
     public CallerId() {}
+    
+    /**
+     * Copy constructor
+     * @param otherCallerId
+     */
+    public CallerId(CallerId otherCallerId) {
+    // Note: make a full copy if we change value to a mutable type
+    value = otherCallerId.value;
+    }
 
     /**
      * Constructor from a string value.
@@ -49,4 +58,20 @@
     public String toString() {
 	return value;
     }
+    
+    @Override
+    public boolean equals(Object other) {
+    if (!(other instanceof CallerId)) {
+        return false;
+    }
+    
+    CallerId otherCallerId = (CallerId) other;
+    
+    return value.equals(otherCallerId.value);
+    }
+    
+    @Override
+    public int hashCode() {
+    return value.hashCode();
+    }
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/Dpid.java b/src/main/java/net/onrc/onos/ofcontroller/util/Dpid.java
index bd91daa..81223d2 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/Dpid.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/Dpid.java
@@ -13,7 +13,7 @@
 @JsonDeserialize(using=DpidDeserializer.class)
 @JsonSerialize(using=DpidSerializer.class)
 public class Dpid {
-    static public long UNKNOWN = 0;
+    static public final long UNKNOWN = 0;
 
     private long value;
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/DpidDeserializer.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/DpidDeserializer.java
index fe93245..695bb1a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/DpidDeserializer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/DpidDeserializer.java
@@ -31,7 +31,7 @@
 	    String fieldname = jp.getCurrentName();
 	    if ("value".equals(fieldname)) {
 		String value = jp.getText();
-		log.debug("Fieldname: " + fieldname + " Value: " + value);
+		log.debug("Fieldname: {} Value: {}", fieldname, value);
 		dpid = new Dpid(value);
 	    }
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/FlowIdDeserializer.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/FlowIdDeserializer.java
index f341027..e6c46aa 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/FlowIdDeserializer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/FlowIdDeserializer.java
@@ -31,7 +31,7 @@
 	    String fieldname = jp.getCurrentName();
 	    if ("value".equals(fieldname)) {
 		String value = jp.getText();
-		log.debug("Fieldname: " + fieldname + " Value: " + value);
+		log.debug("Fieldname: {} Value: {}", fieldname, value);
 		flowId = new FlowId(value);
 	    }
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv4Deserializer.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv4Deserializer.java
index 2969e60..ff2238a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv4Deserializer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv4Deserializer.java
@@ -31,7 +31,7 @@
 	    String fieldname = jp.getCurrentName();
 	    if ("value".equals(fieldname)) {
 		String value = jp.getText();
-		log.debug("Fieldname: " + fieldname + " Value: " + value);
+		log.debug("Fieldname: {} Value: {}", fieldname, value);
 		ipv4 = new IPv4(value);
 	    }
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv4NetDeserializer.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv4NetDeserializer.java
index b2592af..edec8ad 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv4NetDeserializer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv4NetDeserializer.java
@@ -31,7 +31,7 @@
 	    String fieldname = jp.getCurrentName();
 	    if ("value".equals(fieldname)) {
 		String value = jp.getText();
-		log.debug("Fieldname: " + fieldname + " Value: " + value);
+		log.debug("Fieldname: {} Value: {}", fieldname, value);
 		ipv4Net = new IPv4Net(value);
 	    }
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv6Deserializer.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv6Deserializer.java
index c825377..6689024 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv6Deserializer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv6Deserializer.java
@@ -31,7 +31,7 @@
 	    String fieldname = jp.getCurrentName();
 	    if ("value".equals(fieldname)) {
 		String value = jp.getText();
-		log.debug("Fieldname: " + fieldname + " Value: " + value);
+		log.debug("Fieldname: {} Value: {}", fieldname, value);
 		ipv6 = new IPv6(value);
 	    }
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv6NetDeserializer.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv6NetDeserializer.java
index 7191fa9..0d41c21 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv6NetDeserializer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/IPv6NetDeserializer.java
@@ -31,7 +31,7 @@
 	    String fieldname = jp.getCurrentName();
 	    if ("value".equals(fieldname)) {
 		String value = jp.getText();
-		log.debug("Fieldname: " + fieldname + " Value: " + value);
+		log.debug("Fieldname: {} Value: {}", fieldname, value);
 		ipv6Net = new IPv6Net(value);
 	    }
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java
index 1355fe0..5998dcd 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java
@@ -4,7 +4,6 @@
 import java.util.TreeMap;
 
 import net.floodlightcontroller.util.MACAddress;
-import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.CallerId;
 import net.onrc.onos.ofcontroller.util.DataPath;
@@ -31,7 +30,7 @@
 import net.onrc.onos.ofcontroller.util.Switch;
 import net.onrc.onos.ofcontroller.util.SwitchPort;
 
-import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo.Kryo;
 
 /**
  * Class factory for allocating Kryo instances for
@@ -152,9 +151,6 @@
 	kryo.register(TopologyElement.class);
 	kryo.register(TopologyElement.Type.class);
 	kryo.register(TreeMap.class);
-	
-	//ARP message
-	kryo.register(ArpMessage.class);
 
 	return kryo;
     }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/MACAddressDeserializer.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/MACAddressDeserializer.java
index 5253dce..1436f28 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/MACAddressDeserializer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/MACAddressDeserializer.java
@@ -31,7 +31,7 @@
 	    String fieldname = jp.getCurrentName();
 	    if ("value".equals(fieldname)) {
 		String value = jp.getText();
-		log.debug("Fieldname: " + fieldname + " Value: " + value);
+		log.debug("Fieldname: {} Value: {}", fieldname, value);
 		mac = MACAddress.valueOf(value);
 	    }
 	}
diff --git a/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java b/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
index 82af20c..319ea48 100644
--- a/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
+++ b/src/main/java/net/onrc/onos/registry/controller/StandaloneRegistry.java
@@ -101,7 +101,7 @@
 				new HashMap<String, List<ControllerRegistryEntry>>();
 		
 		for (String strSwitch : switchCallbacks.keySet()){
-			log.debug("Swtich _{}", strSwitch);
+			log.debug("Switch _{}", strSwitch);
 			List<ControllerRegistryEntry> list = new ArrayList<ControllerRegistryEntry>();
 			list.add(new ControllerRegistryEntry(controllerId, 0));
 			
diff --git a/start-cassandra.sh b/start-cassandra.sh
index 426fa60..3e9a8d2 100755
--- a/start-cassandra.sh
+++ b/start-cassandra.sh
@@ -3,7 +3,7 @@
 # Set paths
 ONOS_HOME=`dirname $0`
 CASSANDRA_DIR=${HOME}/apache-cassandra-1.2.4
-LOGDIR=${ONOS_HOME}/ONOS/onos-logs
+LOGDIR=${ONOS_HOME}/onos-logs
 CASSANDRA_LOG=${LOGDIR}/cassandara.`hostname`.log
 
 function lotate {
diff --git a/start-onos-embedded.sh b/start-onos-embedded.sh
index c967419..7bd0b97 100755
--- a/start-onos-embedded.sh
+++ b/start-onos-embedded.sh
@@ -73,7 +73,7 @@
 
 # Create a logback file if required
   cat <<EOF_LOGBACK >${ONOS_LOGBACK}
-<configuration scan="true" debug="true">
+<configuration scan="true" scanPeriod="1 minutes" debug="true">
 <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
 <encoder>
 <pattern>%level [%logger:%thread] %msg%n</pattern>
@@ -84,12 +84,14 @@
 <file>${ONOS_LOG}</file>
 <encoder>
 <pattern>%date %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
+<immediateFlush>true</immediateFlush>
 </encoder>
 </appender>
 
 <logger name="org" level="WARN"/>
 <logger name="LogService" level="WARN"/> <!-- Restlet access logging -->
 <logger name="net.floodlightcontroller.logging" level="WARN"/>
+<logger name="com.thinkaurelius.titan" level="INFO"/>
 
 <root level="DEBUG">
 <appender-ref ref="FILE" />
diff --git a/start-onos-jacoco.sh b/start-onos-jacoco.sh
index 2d1adf4..b721e9e 100755
--- a/start-onos-jacoco.sh
+++ b/start-onos-jacoco.sh
@@ -97,6 +97,7 @@
 <logger name="org" level="WARN"/>
 <logger name="LogService" level="WARN"/> <!-- Restlet access logging -->
 <logger name="net.floodlightcontroller.logging" level="WARN"/>
+<logger name="com.thinkaurelius.titan" level="INFO"/>
 
 <root level="DEBUG">
 <appender-ref ref="FILE" />
diff --git a/start-onos.sh b/start-onos.sh
index 1eb6831..899cd34 100755
--- a/start-onos.sh
+++ b/start-onos.sh
@@ -19,6 +19,7 @@
 ## If you want JaCoCo Code Coverage reports... uncomment line below
 #JVM_OPTS="$JVM_OPTS -javaagent:${ONOS_HOME}/lib/jacocoagent.jar=dumponexit=true,output=file,destfile=${LOGDIR}/jacoco.exec"
 JVM_OPTS="$JVM_OPTS -server -d64"
+#JVM_OPTS="$JVM_OPTS -XX:+TieredCompilation -XX:InitialCodeCacheSize=512m -XX:ReservedCodeCacheSize=512m"
 #JVM_OPTS="$JVM_OPTS -Xmx2g -Xms2g -Xmn800m"
 JVM_OPTS="$JVM_OPTS -Xmx1g -Xms1g -Xmn800m"
 #JVM_OPTS="$JVM_OPTS -XX:+UseParallelGC -XX:+AggressiveOpts -XX:+UseFastAccessorMethods"
@@ -78,7 +79,7 @@
 # Create a logback file if required
   if [ ! -f ${ONOS_LOGBACK} ]; then
     cat <<EOF_LOGBACK >${ONOS_LOGBACK}
-<configuration scan="true" debug="true">
+<configuration scan="true" scanPeriod="1 minutes" debug="true">
 <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
 <encoder>
 <pattern>%level [%logger:%thread] %msg%n</pattern>
@@ -89,12 +90,14 @@
 <file>${ONOS_LOG}</file>
 <encoder>
 <pattern>%date %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
+<immediateFlush>true</immediateFlush>
 </encoder>
 </appender>
 
 <logger name="org" level="WARN"/>
 <logger name="LogService" level="WARN"/> <!-- Restlet access logging -->
 <logger name="net.floodlightcontroller.logging" level="WARN"/>
+<logger name="com.thinkaurelius.titan" level="INFO"/>
 
 <root level="DEBUG">
 <appender-ref ref="FILE" />
diff --git a/start-rest.sh b/start-rest.sh
index 910eb41..94bbd46 100755
--- a/start-rest.sh
+++ b/start-rest.sh
@@ -57,6 +57,10 @@
 function start {
     lotate $REST_LOG 10 
     cd $WEBDIR
+    # Make log dir for iperf log files
+    if [ ! -d  log ]; then
+      mkdir log
+    fi
     $restscript > $REST_LOG 2>&1 &
 }
 
diff --git a/test-network/mininet/dev_network_edge.py b/test-network/mininet/dev_network_edge.py
index 29be6ad..acc5d0d 100755
--- a/test-network/mininet/dev_network_edge.py
+++ b/test-network/mininet/dev_network_edge.py
@@ -113,7 +113,6 @@
     #  controllers.append(rc)
 
     #net.controllers=controllers
-    net.build()
 
     host = []
     for i in range (NR_NODES):
diff --git a/test-network/mininet/dev_network_edge_2.py b/test-network/mininet/dev_network_edge_2.py
index 50dc00c..553b376 100755
--- a/test-network/mininet/dev_network_edge_2.py
+++ b/test-network/mininet/dev_network_edge_2.py
@@ -113,7 +113,6 @@
     #  controllers.append(rc)
 
     #net.controllers=controllers
-    net.build()
 
     host = []
     for i in range (NR_NODES):
diff --git a/web/js/onos-topology.js b/web/js/onos-topology.js
index 3772127..d086f6a 100644
--- a/web/js/onos-topology.js
+++ b/web/js/onos-topology.js
@@ -222,8 +222,23 @@
 	return changed
     }
 
+    function nr_active_sw(){
+        var n=0; 
+        var nodes = force.nodes();
+        for(var i=0;i<nodes.length;i++){
+          if(nodes[i].group!=0)
+            n++;
+        }; 
+        return n;
+    }
+
     function draw(force, path, circle, text){
 	force.stop();
+        svg.append("svg:text")
+	    .attr("x", 50)
+	    .attr("y", 20)
+            .text(function(){return "Switch: " + force.nodes().length + " (Active: " + nr_active_sw()  + ")/ Link: " + force.links().length});
+
         path.enter().append("svg:path")
 	    .attr("class", function(d) { return "link"; })
 	    .attr("marker-end", function(d) {
diff --git a/web/onos-topology.html b/web/onos-topology.html
index 5834521..b4e5097 100644
--- a/web/onos-topology.html
+++ b/web/onos-topology.html
@@ -43,7 +43,7 @@
       <path d="M0,-5L10,0L0,5" fill="red" stroke="red"/>
     </marker>
   </defs>
-<h1>ONOS Sprint 4 Demo GUI</h1>
+<h1>ONOS Simple Topology GUI</h1>
 <h2>Controller Status</h2>
 <div id="servers"></div>
 <div id="onos-status"></div>
diff --git a/web/topology_rest.py b/web/topology_rest.py
index 38dba5d..0b0ab01 100755
--- a/web/topology_rest.py
+++ b/web/topology_rest.py
@@ -8,8 +8,8 @@
 import io
 import time
 import random
-
 import re
+from urllib2 import Request, urlopen, URLError, HTTPError
 
 from flask import Flask, json, Response, render_template, make_response, request
 
@@ -114,132 +114,131 @@
 ## Proxy ##
 @app.route("/proxy/gui/link/<cmd>/<src_dpid>/<src_port>/<dst_dpid>/<dst_port>")
 def proxy_link_change(cmd, src_dpid, src_port, dst_dpid, dst_port):
+  url = "%s/gui/link/%s/%s/%s/%s/%s" % (ONOS_GUI3_CONTROL_HOST, cmd, src_dpid, src_port, dst_dpid, dst_port)
   try:
-    command = "curl -s %s/gui/link/%s/%s/%s/%s/%s" % (ONOS_GUI3_CONTROL_HOST, cmd, src_dpid, src_port, dst_dpid, dst_port)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    print "REST IF has issue"
-    exit
+    result = ""
+    print "REST IF has issue %s" % url
 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
 
 @app.route("/proxy/gui/switchctrl/<cmd>")
 def proxy_switch_controller_setting(cmd):
+  url = "%s/gui/switchctrl/%s" % (ONOS_GUI3_CONTROL_HOST, cmd)
   try:
-    command = "curl -s %s/gui/switchctrl/%s" % (ONOS_GUI3_CONTROL_HOST, cmd)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    print "REST IF has issue"
-    exit
+    result = ""
+    print "REST IF has issue %s" % url
 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
 
 @app.route("/proxy/gui/switch/<cmd>/<dpid>")
 def proxy_switch_status_change(cmd, dpid):
+  url = "%s/gui/switch/%s/%s" % (ONOS_GUI3_CONTROL_HOST, cmd, dpid)
   try:
-    command = "curl -s %s/gui/switch/%s/%s" % (ONOS_GUI3_CONTROL_HOST, cmd, dpid)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    print "REST IF has issue"
-    exit
+    result = ""
+    print "REST IF has issue %s" % url
 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
 
 @app.route("/proxy/gui/controller/<cmd>/<controller_name>")
 def proxy_controller_status_change(cmd, controller_name):
+  url = "%s/gui/controller/%s/%s" % (ONOS_GUI3_CONTROL_HOST, cmd, controller_name)
   try:
-    command = "curl -s %s/gui/controller/%s/%s" % (ONOS_GUI3_CONTROL_HOST, cmd, controller_name)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    print "REST IF has issue"
-    exit
-
+    result = ""
+    print "REST IF has issue %s" % url
+ 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
 
 @app.route("/proxy/gui/addflow/<src_dpid>/<src_port>/<dst_dpid>/<dst_port>/<srcMAC>/<dstMAC>")
 def proxy_add_flow(src_dpid, src_port, dst_dpid, dst_port, srcMAC, dstMAC):
   try:
-    command = "curl -s %s/gui/addflow/%s/%s/%s/%s/%s/%s" % (ONOS_GUI3_CONTROL_HOST, src_dpid, src_port, dst_dpid, dst_port, srcMAC, dstMAC)
-    print command
-    result = os.popen(command).read()
+    url = "%s/gui/addflow/%s/%s/%s/%s/%s/%s" % (ONOS_GUI3_CONTROL_HOST, src_dpid, src_port, dst_dpid, dst_port, srcMAC, dstMAC)
+    #print "proxy gui addflow " + url
+    (code, result) = get_json(url)
   except:
-    print "REST IF has issue"
-    exit
+    print "REST IF has issue %s" % url
+    print "Result %s" % result
+    exit()
 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
 
 @app.route("/proxy/gui/delflow/<flow_id>")
 def proxy_del_flow(flow_id):
+  url = "%s/gui/delflow/%s" % (ONOS_GUI3_CONTROL_HOST, flow_id)
   try:
-    command = "curl -s %s/gui/delflow/%s" % (ONOS_GUI3_CONTROL_HOST, flow_id)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    print "REST IF has issue"
-    exit
+    result = ""
+    print "REST IF has issue %s" % url
 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
 
 @app.route("/proxy/gui/iperf/start/<flow_id>/<duration>/<samples>")
 def proxy_iperf_start(flow_id,duration,samples):
+  url = "%s/gui/iperf/start/%s/%s/%s" % (ONOS_GUI3_CONTROL_HOST, flow_id, duration, samples)
   try:
-    command = "curl -m 40 -s %s/gui/iperf/start/%s/%s/%s" % (ONOS_GUI3_CONTROL_HOST, flow_id, duration, samples)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    print "REST IF has issue"
-    exit
+    result = ""
+    print "REST IF has issue %s" % url
 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
 
 @app.route("/proxy/gui/iperf/rate/<flow_id>")
 def proxy_iperf_rate(flow_id):
+  url = "%s/gui/iperf/rate/%s" % (ONOS_GUI3_CONTROL_HOST, flow_id)
   try:
-    command = "curl -s %s/gui/iperf/rate/%s" % (ONOS_GUI3_CONTROL_HOST, flow_id)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    print "REST IF has issue"
-    exit
+    result = ""
+    print "REST IF has issue %s" % url
 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
 
 @app.route("/proxy/gui/reset")
 def proxy_gui_reset():
-  result = ""
+  url = "%s/gui/reset" % (ONOS_GUI3_CONTROL_HOST)
   try:
-    command = "curl -m 300 -s %s/gui/reset" % (ONOS_GUI3_CONTROL_HOST)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    print "REST IF has issue"
-    exit
+    result = ""
+    print "REST IF has issue %s" % url
 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
 
 @app.route("/proxy/gui/scale")
 def proxy_gui_scale():
-  result = ""
+  url = "%s/gui/scale" % (ONOS_GUI3_CONTROL_HOST)
   try:
-    command = "curl -m 300 -s %s/gui/scale" % (ONOS_GUI3_CONTROL_HOST)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    print "REST IF has issue"
-    exit
+    result = ""
+    print "REST IF has issue %s" % url
 
   resp = Response(result, status=200, mimetype='application/json')
   return resp
@@ -247,19 +246,20 @@
 ###### ONOS REST API ##############################
 ## Worker Func ###
 def get_json(url):
-  code = 200
+  code = 200;
   try:
-    command = "curl -m 60 -s %s" % (url)
-    result = os.popen(command).read()
-    parsedResult = json.loads(result)
-    if type(parsedResult) == 'dict' and parsedResult.has_key('code'):
-      print "REST %s returned code %s" % (command, parsedResult['code'])
-      code=500
-  except:
-    print "REST IF %s has issue" % command
+    response = urlopen(url)
+  except URLError, e:
+    print "get_json: REST IF %s has issue. Reason: %s" % (url, e.reason)
     result = ""
-    code = 500
+    return (500, result)
+  except HTTPError, e:
+    print "get_json: REST IF %s has issue. Code %s" % (url, e.code)
+    result = ""
+    return (e.code, result)
 
+  result = response.read()
+#  parsedResult = json.loads(result)
   return (code, result)
 
 def pick_host():
@@ -354,14 +354,13 @@
 @app.route('/topology', methods=['GET'])
 def topology_for_gui():
   try:
-    command = "curl -s \'http://%s:%s/wm/onos/topology/switches/all/json\'" % (RestIP, RestPort)
-    result = os.popen(command).read()
+    url="http://%s:%s/wm/onos/topology/switches/all/json" % (RestIP, RestPort)
+    (code, result) = get_json(url)
     parsedResult = json.loads(result)
   except:
-    log_error("REST IF has issue: %s" % command)
+    log_error("REST IF has issue: %s" % url)
     log_error("%s" % result)
     return
-#    sys.exit(0)
 
   topo = {}
   switches = []
@@ -382,11 +381,11 @@
       switches.append(sw)
 
   try:
-    command = "curl -s \'http://%s:%s/wm/onos/registry/switches/json\'" % (RestIP, RestPort)
-    result = os.popen(command).read()
+    url="http://%s:%s/wm/onos/registry/switches/json" % (RestIP, RestPort)
+    (code, result) = get_json(url)
     parsedResult = json.loads(result)
   except:
-    log_error("REST IF has issue: %s" % command)
+    log_error("REST IF has issue: %s" % url)
     log_error("%s" % result)
 
   for key in parsedResult:
@@ -397,35 +396,12 @@
       if switches[sw_id]['group'] != 0:
         switches[sw_id]['group'] = controllers.index(ctrl) + 1
 
-#  try:
-#    v1 = "00:00:00:00:00:0a:0d:00"
-#    v1 = "00:00:00:00:00:0d:00:d1"
-#    p1=1
-#    v2 = "00:00:00:00:00:0b:0d:03"
-#    v2 = "00:00:00:00:00:0d:00:d3"
-#    p2=1
-#    command = "curl -s http://%s:%s/wm/onos/topology/route/%s/%s/%s/%s/json" % (RestIP, RestPort, v1, p1, v2, p2)
-#    result = os.popen(command).read()
-#    parsedResult = json.loads(result)
-#  except:
-#    log_error("No route")
-#    parsedResult = {}
-
-  #path = []
-  #if parsedResult.has_key('flowEntries'):
-  #  flowEntries= parsedResult['flowEntries']
-  #  for i, v in enumerate(flowEntries):
-  #    if i < len(flowEntries) - 1:
-  #      sdpid= flowEntries[i]['dpid']['value']
-  #      ddpid = flowEntries[i+1]['dpid']['value']
-  #      path.append( (sdpid, ddpid))
-
   try:
-    command = "curl -s \'http://%s:%s/wm/onos/topology/links/json\'" % (RestIP, RestPort)
-    result = os.popen(command).read()
+    url = "http://%s:%s/wm/onos/topology/links/json" % (RestIP, RestPort)
+    (code, result) = get_json(url)
     parsedResult = json.loads(result)
   except:
-    log_error("REST IF has issue: %s" % command)
+    log_error("REST IF has issue: %s" % url)
     log_error("%s" % result)
     return
 #    sys.exit(0)
@@ -457,19 +433,16 @@
   resp = Response(js, status=200, mimetype='application/json')
   return resp
 
-#@app.route("/wm/floodlight/topology/toporoute/00:00:00:00:00:a1/2/00:00:00:00:00:c1/3/json")
-#@app.route("/wm/floodlight/topology/toporoute/<srcdpid>/<srcport>/<destdpid>/<destport>/json")
 @app.route("/wm/floodlight/topology/toporoute/<v1>/<p1>/<v2>/<p2>/json")
 def shortest_path(v1, p1, v2, p2):
   try:
-    command = "curl -s \'http://%s:%s/wm/onos/topology/switches/all/json\'" % (RestIP, RestPort)
-    result = os.popen(command).read()
+    url = "http://%s:%s/wm/onos/topology/switches/all/json" % (RestIP, RestPort)
+    (code, result) = get_json(url)
     parsedResult = json.loads(result)
   except:
     log_error("REST IF has issue: %s" % command)
     log_error("%s" % result)
     return
-#    sys.exit(0)
 
   topo = {}
   switches = []
@@ -494,30 +467,28 @@
       switches.append(sw)
 
   try:
-    command = "curl -s http://%s:%s/wm/onos/topology/route/%s/%s/%s/%s/json" % (RestIP, RestPort, v1, p1, v2, p2)
-    result = os.popen(command).read()
+    url = "http://%s:%s/wm/onos/topology/route/%s/%s/%s/%s/json" % (RestIP, RestPort, v1, p1, v2, p2)
+    (code, result) = get_json(url)
     parsedResult = json.loads(result)
   except:
     log_error("No route")
     parsedResult = []
-#    exit(1)
 
   path = [];
   for i, v in enumerate(parsedResult):
     if i < len(parsedResult) - 1:
-      sdpid= parsedResult[i]['switch']
-      ddpid = parsedResult[i+1]['switch']
+      sdpid= parsedResult['flowEntries'][i]['dpid']['value']
+      ddpid= parsedResult['flowEntries'][i+1]['dpid']['value']
       path.append( (sdpid, ddpid))
 
   try:
-    command = "curl -s \'http://%s:%s/wm/onos/topology/links/json\'" % (RestIP, RestPort)
-    result = os.popen(command).read()
+    url = "http://%s:%s/wm/onos/topology/links/json" % (RestIP, RestPort)
+    (code, result) = get_json(url)
     parsedResult = json.loads(result)
   except:
     log_error("REST IF has issue: %s" % command)
     log_error("%s" % result)
     return
-#    sys.exit(0)
 
   for v in parsedResult:
     link = {}
@@ -548,13 +519,11 @@
 @app.route("/wm/floodlight/core/controller/switches/json")
 def query_switch():
   try:
-    command = "curl -s \'http://%s:%s/wm/onos/topology/switches/all/json\'" % (RestIP, RestPort)
-#    http://localhost:8080/wm/onos/topology/switches/active/json
-    print command
-    result = os.popen(command).read()
+    url = "http://%s:%s/wm/onos/topology/switches/all/json" % (RestIP, RestPort)
+    (code, result) = get_json(url)
     parsedResult = json.loads(result)
   except:
-    log_error("REST IF has issue: %s" % command)
+    log_error("REST IF has issue: %s" % url)
     log_error("%s" % result)
     return
 #    sys.exit(0)
@@ -577,49 +546,6 @@
   resp = Response(js, status=200, mimetype='application/json')
   return resp
 
-@app.route("/wm/floodlight/device/")
-def devices():
-  try:
-    command = "curl -s http://%s:%s/graphs/%s/vertices\?key=type\&value=device" % (RestIP, RestPort, DBName)
-    result = os.popen(command).read()
-    parsedResult = json.loads(result)['results']
-  except:
-    log_error("REST IF has issue: %s" % command)
-    log_error("%s" % result)
-    return
-#    sys.exit(0)
-
-  devices = []
-  for v in parsedResult:
-    dl_addr = v['dl_addr']
-    nw_addr = v['nw_addr']
-    vertex = v['_id']
-    mac = []
-    mac.append(dl_addr)
-    ip = []
-    ip.append(nw_addr)
-    device = {}
-    device['entryClass']="DefaultEntryClass"
-    device['mac']=mac
-    device['ipv4']=ip
-    device['vlan']=[]
-    device['lastSeen']=0
-    attachpoints =[]
-
-    port, dpid = deviceV_to_attachpoint(vertex)
-    attachpoint = {}
-    attachpoint['port']=port
-    attachpoint['switchDPID']=dpid
-    attachpoints.append(attachpoint)
-    device['attachmentPoint']=attachpoints
-    devices.append(device)
-
-  js = json.dumps(devices)
-  resp = Response(js, status=200, mimetype='application/json')
-  return resp
-
-#{"entityClass":"DefaultEntityClass","mac":["7c:d1:c3:e0:8c:a3"],"ipv4":["192.168.2.102","10.1.10.35"],"vlan":[],"attachmentPoint":[{"port":13,"switchDPID":"00:01:00:12:e2:78:32:44","errorStatus":null}],"lastSeen":1357333593496}
-
 ## return fake stat for now
 @app.route("/wm/floodlight/core/switch/<switchId>/<statType>/json")
 def switch_stat(switchId, statType):
@@ -638,68 +564,19 @@
     resp = Response(js, status=200, mimetype='application/json')
     return resp
 
-
-@app.route("/wm/onos/linkdiscovery/links/json")
-def query_links():
-  try:
-    command = 'curl -s http://%s:%s/graphs/%s/vertices?key=type\&value=port' % (RestIP, RestPort, DBName)
-    print command
-    result = os.popen(command).read()
-    parsedResult = json.loads(result)['results']
-  except:
-    log_error("REST IF has issue: %s" % command)
-    log_error("%s" % result)
-    return
-#    sys.exit(0)
-
-  debug("query_links %s" % command)
-#  pp.pprint(parsedResult)
-  sport = []
-  links = []
-  for v in parsedResult:
-    srcport = v['_id']
-    try:
-      command = "curl -s http://%s:%s/graphs/%s/vertices/%d/out?_label=link" % (RestIP, RestPort, DBName, srcport)
-      print command
-      result = os.popen(command).read()
-      linkResults = json.loads(result)['results']
-    except:
-      log_error("REST IF has issue: %s" % command)
-      log_error("%s" % result)
-      return
-#      sys.exit(0)
-
-    for p in linkResults:
-      if p.has_key('type') and p['type'] == "port":
-        dstport = p['_id']
-        (sport, sdpid) = portV_to_port_dpid(srcport)
-        (dport, ddpid) = portV_to_port_dpid(dstport)
-        link = {}
-        link["src-switch"]=sdpid
-        link["src-port"]=sport
-        link["src-port-state"]=0
-        link["dst-switch"]=ddpid
-        link["dst-port"]=dport
-        link["dst-port-state"]=0
-        link["type"]="internal"
-        links.append(link)
-
-#  pp.pprint(links)
-  js = json.dumps(links)
-  resp = Response(js, status=200, mimetype='application/json')
-  return resp
-
 @app.route("/controller_status")
 def controller_status():
-#  onos_check="ssh -i ~/.ssh/onlabkey.pem %s ONOS/start-onos.sh status | awk '{print $1}'"
-  onos_check="cd; onos status | grep %s | awk '{print $2}'"
-  #cassandra_check="ssh -i ~/.ssh/onlabkey.pem %s ONOS/start-cassandra.sh status"
+  url= "http://%s:%d/wm/onos/registry/controllers/json" % (RestIP, RestPort)
+  (code, result) = get_json(url)
+  parsedResult = json.loads(result)
 
   cont_status=[]
   for i in controllers:
     status={}
-    onos=os.popen(onos_check % i).read()[:-1]
-#    onos=os.popen(onos_check % (i, i.lower())).read()[:-1]
+    if i in parsedResult:
+      onos=1
+    else:
+      onos=0
     status["name"]=i
     status["onos"]=onos
     status["cassandra"]=0
@@ -709,6 +586,7 @@
   resp = Response(js, status=200, mimetype='application/json')
   return resp
 
+
 ### Command ###
 @app.route("/gui/controller/<cmd>/<controller_name>")
 def controller_status_change(cmd, controller_name):
@@ -989,17 +867,16 @@
 #http://localhost:9000/gui/iperf/start/<flow_id>/<duration>
 @app.route("/gui/iperf/start/<flow_id>/<duration>/<samples>")
 def iperf_start(flow_id,duration,samples):
+  url = "http://%s:%s/wm/onos/flows/get/%s/json" % (RestIP, RestPort, flow_id)
   try:
-    command = "curl -s \'http://%s:%s/wm/onos/flows/get/%s/json\'" % (RestIP, RestPort, flow_id)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
     if len(result) == 0:
       print "No Flow found"
       return "Flow %s not found" % (flow_id);
   except:
-    print "REST IF has issue"
-    return "REST IF has issue"
-    exit
+    print "REST IF has issue %s" % url
+    return "REST IF has issue %s" % url
 
   parsedResult = json.loads(result)
 
@@ -1054,15 +931,14 @@
 #http://localhost:9000/gui/iperf/rate/<flow_id>
 @app.route("/gui/iperf/rate/<flow_id>")
 def iperf_rate(flow_id):
+  url = "http://%s:%s/wm/onos/flows/get/%s/json" % (RestIP, RestPort, flow_id)
   try:
-    command = "curl -s \'http://%s:%s/wm/onos/flows/get/%s/json\'" % (RestIP, RestPort, flow_id)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
     if len(result) == 0:
-      resp = Response(result, status=400, mimetype='text/html')
-      return "no such iperf flow (flowid %s)" % flow_id;
+      return "no such iperf flow (flowid %s)" % flow_id
   except:
-    print "REST IF has issue"
+    print "REST IF has issue %s" % url
     exit
 
   parsedResult = json.loads(result)
@@ -1082,12 +958,13 @@
     else:
       host = controllers[hostid-1]
 
+  url="http://%s:%s/log/iperfsvr_%s.out" % (host, 9000, flow_id)
   try:
-    command = "curl -s http://%s:%s/log/iperfsvr_%s.out" % (host, 9000, flow_id)
-    print command
-    result = os.popen(command).read()
+    response = urlopen(url)
+    result = response.read()
   except:
-    exit
+    print "REST IF has issue %s" % url
+    return 
 
   if re.match("Cannot", result):
     resp = Response(result, status=400, mimetype='text/html')
@@ -1101,25 +978,24 @@
   read_config()
   read_link_def()
   if len(sys.argv) > 1 and sys.argv[1] == "-d":
-#      add_flow("00:00:00:00:00:00:02:02", 1, "00:00:00:00:00:00:03:02", 1, "00:00:00:00:02:02", "00:00:00:00:03:0c")
-#     link_change("up", "00:00:00:00:ba:5e:ba:11", 1, "00:00:00:00:00:00:00:00", 1)
-#     link_change("down", "00:00:20:4e:7f:51:8a:35", 1, "00:00:00:00:00:00:00:00", 1)
-#     link_change("up", "00:00:00:00:00:00:02:03", 1, "00:00:00:00:00:00:00:00", 1)
-#     link_change("down", "00:00:00:00:00:00:07:12", 1, "00:00:00:00:00:00:00:00", 1)
-#    print "-- query all switches --"
-#    query_switch()
-#    print "-- query topo --"
-#    topology_for_gui()
-#    link_change(1,2,3,4)
-    print "-- query all links --"
-#    query_links()
-#    print "-- query all devices --"
-#    devices()
-#    iperf_start(1,10,15)
-#    iperf_rate(1)
-#    switches()
-#    add_flow(1,2,3,4,5,6)
-    reset_demo()
+    # for debugging
+    #add_flow("00:00:00:00:00:00:02:02", 1, "00:00:00:00:00:00:03:02", 1, "00:00:00:00:02:02", "00:00:00:00:03:0c")
+    #proxy_link_change("up", "00:00:00:00:ba:5e:ba:11", 1, "00:00:00:00:00:00:00:00", 1)
+    #proxy_link_change("down", "00:00:20:4e:7f:51:8a:35", 1, "00:00:00:00:00:00:00:00", 1)
+    #proxy_link_change("up", "00:00:00:00:00:00:02:03", 1, "00:00:00:00:00:00:00:00", 1)
+    #proxy_link_change("down", "00:00:00:00:00:00:07:12", 1, "00:00:00:00:00:00:00:00", 1)
+    #print "-- query all switches --"
+    #query_switch()
+    #print "-- query topo --"
+    #topology_for_gui()
+    ##print "-- query all links --"
+    ##query_links()
+    #print "-- query all devices --"
+    #devices()
+    #links()
+    #switches()
+    #reset_demo()
+    pass
   else:
     app.debug = True
     app.run(threaded=True, host="0.0.0.0", port=9000)