Merge pull request #453 from jonohart/master

ARP broadcasting and reply notifications
diff --git a/conf/hazelcast.xml b/conf/hazelcast.xml
index 267cd58..11bef59 100644
--- a/conf/hazelcast.xml
+++ b/conf/hazelcast.xml
@@ -99,4 +99,7 @@
     </join>
   </network>
 
+  <properties>
+    <property name="hazelcast.logging.type">slf4j</property>
+  </properties>
 </hazelcast>
diff --git a/scripts/parse-classpath.py b/scripts/parse-classpath.py
new file mode 100755
index 0000000..2e60a1d
--- /dev/null
+++ b/scripts/parse-classpath.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python
+
+import sys
+from os import path
+from xml.dom import minidom
+
+# Location of the .classpath file relative to this script
+classpath_filename = "../.classpath"
+m2_repository = "~/.m2/repository"
+
+def parse_classpath_xml(classpath_file, abs_m2_repository):
+    xmldoc = minidom.parse(classpath_file)
+    classpathentries = xmldoc.getElementsByTagName('classpathentry')
+    classpath = ""
+    for entry in classpathentries:
+        kind = entry.attributes['kind'].value
+        if kind == "output" or kind == "var":
+            cp_entry = entry.attributes['path'].value + ":"
+            
+            classpath += cp_entry.replace("M2_REPO", abs_m2_repository)
+
+    print classpath[0:-1]
+
+if __name__ == "__main__":    
+    abs_m2_repository = path.expanduser("~/.m2/repository")
+
+    classpath_file = path.abspath(path.join(path.dirname(path.realpath(sys.argv[0])), classpath_filename))
+
+    try:
+        with open(classpath_file) as f:
+            parse_classpath_xml(f, abs_m2_repository)
+    except IOError:
+        print "Error reading classpath file from %s" % classpath_file
+        print "- Please check path is correct then run 'mvn eclipse:eclipse' to generate file"
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 04e001e..180cbe9 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -18,6 +18,7 @@
 import net.floodlightcontroller.restserver.IRestApiService;
 import net.onrc.onos.datagrid.web.DatagridWebRoutable;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
 import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
@@ -81,7 +82,7 @@
     
     // State related to the ARP map
     protected static final String arpMapName = "arpMap";
-    private IMap<byte[], byte[]> arpMap = null;
+    private IMap<ArpMessage, byte[]> arpMap = null;
     private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
     private final byte[] dummyByte = {0};
 
@@ -338,13 +339,13 @@
      *  - Key: Request ID (String)
      *  - Value: ARP request packet (byte[])
      */
-    class ArpMapListener implements EntryListener<byte[], byte[]> {
+    class ArpMapListener implements EntryListener<ArpMessage, byte[]> {
 		/**
 		 * Receive a notification that an entry is added.
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		public void entryAdded(EntryEvent<byte[], byte[]> event) {
+		public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
 		    for (IArpEventHandler arpEventHandler : arpEventHandlers) {
 		    	arpEventHandler.arpRequestNotification(event.getKey());
 		    }
@@ -367,7 +368,7 @@
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+		public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
 			// Not used
 		}
 	
@@ -376,7 +377,7 @@
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+		public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
 			// Not used
 		}
 	
@@ -385,7 +386,7 @@
 		 *
 		 * @param event the notification event for the entry.
 		 */
-		public void entryEvicted(EntryEvent<byte[], byte[]> event) {
+		public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
 		    // Not used
 		}
     }
@@ -866,8 +867,8 @@
     }
     
     @Override
-    public void sendArpRequest(byte[] arpRequest) {
-    	log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
-     	arpMap.putAsync(arpRequest, dummyByte, 1L, TimeUnit.MILLISECONDS);
+    public void sendArpRequest(ArpMessage arpMessage) {
+    	//log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
+     	arpMap.putAsync(arpMessage, dummyByte, 1L, TimeUnit.MILLISECONDS);
     }
 }
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 3f40480..9361341 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -4,6 +4,7 @@
 
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
 import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
@@ -153,5 +154,5 @@
      * Send an ARP request to other ONOS instances
      * @param arpRequest The request packet to send
      */
-    public void sendArpRequest(byte[] arpRequest);
+    public void sendArpRequest(ArpMessage arpMessage);
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
index 1f39a38..3a324b1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
@@ -53,7 +53,11 @@
 
 	@Override
 	public Iterable<IPortObject> getPortsOnSwitch(String dpid) {
-		// TODO Auto-generated method stub
+		op.close(); //Commit to ensure we see latest data
+		ISwitchObject switchObject = op.searchSwitch(dpid);
+		if (switchObject != null) {
+			return switchObject.getPorts();
+		}
 		return null;
 	}
 
diff --git a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
index 6364a2f..104032b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
@@ -13,6 +13,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.net.InetAddresses;
+
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.IOFSwitch;
 import net.floodlightcontroller.core.IOFSwitchListener;
@@ -26,7 +28,6 @@
 import net.floodlightcontroller.devicemanager.IDeviceService;
 import net.floodlightcontroller.routing.Link;
 import net.floodlightcontroller.threadpool.IThreadPoolService;
-
 import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.graph.GraphDBConnection;
 import net.onrc.onos.graph.GraphDBOperation;
@@ -45,6 +46,7 @@
 import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryListener;
 import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryService;
 import net.onrc.onos.ofcontroller.linkdiscovery.LinkInfo;
+import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
 import net.onrc.onos.registry.controller.IControllerRegistryService;
 import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
@@ -372,6 +374,10 @@
 	public void deviceAdded(IDevice device) {
 		log.debug("{}:deviceAdded(): Adding device {}",this.getClass(),device.getMACAddressString());
 		devStore.addDevice(device);
+		for (int intIpv4Address : device.getIPv4Addresses()) {
+			datagridService.sendArpRequest(
+					ArpMessage.newReply(InetAddresses.fromInteger(intIpv4Address)));
+		}
 	}
 
 	@Override
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpMessage.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpMessage.java
new file mode 100644
index 0000000..53891ef
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ArpMessage.java
@@ -0,0 +1,60 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import net.onrc.onos.ofcontroller.util.SwitchPort;
+
+public class ArpMessage implements Serializable {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	
+	private final Type type;
+	private final InetAddress forAddress;
+	private final byte[] packetData;
+	
+	private final List<SwitchPort> switchPorts = new ArrayList<SwitchPort>();
+	
+	public enum Type {
+		REQUEST,
+		REPLY
+	}
+	
+	private ArpMessage(Type type, InetAddress address, byte[] eth) {
+		// TODO Auto-generated constructor stub
+		this.type = type;
+		this.forAddress = address;
+		this.packetData = eth;
+	}
+	
+	private ArpMessage(Type type, InetAddress address) {
+		this.type = type;
+		this.forAddress = address;
+		this.packetData = null;
+	}
+	
+	public static ArpMessage newRequest(InetAddress forAddress, byte[] arpRequest) {
+		return new ArpMessage(Type.REQUEST, forAddress, arpRequest);
+	}
+	
+	public static ArpMessage newReply(InetAddress forAddress) {
+		return new ArpMessage(Type.REPLY, forAddress);
+	}
+
+	public Type getType() {
+		return type;
+	}
+	
+	public InetAddress getAddress() {
+		return forAddress;
+	}
+	
+	public byte[] getPacket() {
+		return packetData;
+	}
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
index 51d92bd..4ec32ec 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
@@ -7,5 +7,5 @@
 	 * @param id The string ID of the ARP request
 	 * @param arpRequest The ARP request packet
 	 */
-	public void arpRequestNotification(byte[] arpRequest);
+	public void arpRequestNotification(ArpMessage arpMessage);
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index 426a455..85fd618 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -17,6 +17,7 @@
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.IOFMessageListener;
 import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.devicemanager.IDevice;
 import net.floodlightcontroller.packet.ARP;
 import net.floodlightcontroller.packet.Ethernet;
 import net.floodlightcontroller.packet.IPv4;
@@ -27,10 +28,13 @@
 import net.onrc.onos.ofcontroller.bgproute.Interface;
 import net.onrc.onos.ofcontroller.core.IDeviceStorage;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IDeviceObject;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoLinkService;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoSwitchService;
 import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
 import net.onrc.onos.ofcontroller.core.internal.DeviceStorageImpl;
+import net.onrc.onos.ofcontroller.core.internal.TopoLinkServiceImpl;
+import net.onrc.onos.ofcontroller.core.internal.TopoSwitchServiceImpl;
 
 import org.openflow.protocol.OFMessage;
 import org.openflow.protocol.OFPacketIn;
@@ -63,7 +67,7 @@
 	private IRestApiService restApi;
 	
 	private IDeviceStorage deviceStorage;
-	private ITopoSwitchService topoSwitchService;
+	private volatile ITopoSwitchService topoSwitchService;
 	private ITopoLinkService topoLinkService;
 	
 	private short vlan;
@@ -141,6 +145,9 @@
 
 		arpRequests = Multimaps.synchronizedSetMultimap(
 				HashMultimap.<InetAddress, ArpRequest>create());
+		
+		topoSwitchService = new TopoSwitchServiceImpl();
+		topoLinkService = new TopoLinkServiceImpl();
 	}
 	
 	public void startUp() {
@@ -475,11 +482,21 @@
 	
 	private void sendToOtherNodes(Ethernet eth, OFPacketIn pi) {
 		ARP arp = (ARP) eth.getPayload();
+		
 		if (log.isTraceEnabled()) {
 			log.trace("Sending ARP request for {} to other ONOS instances",
-					HexString.toHexString(arp.getTargetProtocolAddress()));
+					inetAddressToString(arp.getTargetProtocolAddress()));
 		}
-		datagrid.sendArpRequest(eth.serialize());
+		
+		InetAddress targetAddress;
+		try {
+			targetAddress = InetAddress.getByAddress(arp.getTargetProtocolAddress());
+		} catch (UnknownHostException e) {
+			log.error("Unknown host", e);
+			return;
+		}
+		
+		datagrid.sendArpRequest(ArpMessage.newRequest(targetAddress, eth.serialize()));
 	}
 	
 	private void broadcastArpRequestOutEdge(byte[] arpRequest, long inSwitch, short inPort) {
@@ -530,6 +547,44 @@
 		}
 	}
 	
+	private void broadcastArpRequestOutMyEdge(byte[] arpRequest) {
+		for (IOFSwitch sw : floodlightProvider.getSwitches().values()) {
+			
+			OFPacketOut po = new OFPacketOut();
+			po.setInPort(OFPort.OFPP_NONE)
+			.setBufferId(-1)
+			.setPacketData(arpRequest);
+			
+			List<OFAction> actions = new ArrayList<OFAction>();
+			
+			Iterable<IPortObject> ports 
+				= topoSwitchService.getPortsOnSwitch(sw.getStringId());
+			if (ports == null) {
+				continue;
+			}
+			
+			for (IPortObject portObject : ports) {
+				if (!portObject.getLinkedPorts().iterator().hasNext()) {
+					actions.add(new OFActionOutput(portObject.getNumber()));
+				}
+			}
+			
+			po.setActions(actions);
+			short actionsLength = (short) 
+					(actions.size() * OFActionOutput.MINIMUM_LENGTH);
+			po.setActionsLength(actionsLength);
+			po.setLengthU(OFPacketOut.MINIMUM_LENGTH + actionsLength 
+					+ arpRequest.length);
+			
+			try {
+				sw.write(po, null);
+				sw.flush();
+			} catch (IOException e) {
+				log.error("Failure writing packet out to switch", e);
+			}
+		}
+	}
+	
 	private void sendArpRequestOutPort(byte[] arpRequest, long dpid, short port) {
 		if (log.isTraceEnabled()) {
 			log.trace("Sending ARP request out {}/{}", 
@@ -666,8 +721,48 @@
 	 */
 	
 	@Override
-	public void arpRequestNotification(byte[] arpRequest) {
+	public void arpRequestNotification(ArpMessage arpMessage) {
 		log.debug("Received ARP notification from other instances");
-		broadcastArpRequestOutEdge(arpRequest, Long.MAX_VALUE, Short.MAX_VALUE);
+		
+		switch (arpMessage.getType()){
+		case REQUEST:
+			broadcastArpRequestOutMyEdge(arpMessage.getPacket());
+			break;
+		case REPLY:
+			sendArpReplyToWaitingRequesters(arpMessage.getAddress());
+			break;
+		}
+	}
+	
+	private void sendArpReplyToWaitingRequesters(InetAddress address) {
+		log.debug("Sending ARP reply for {} to requesters", 
+				address.getHostAddress());
+		
+		//See if anyone's waiting for this ARP reply
+		Set<ArpRequest> requests = arpRequests.get(address);
+		
+		//Synchronize on the Multimap while using an iterator for one of the sets
+		List<ArpRequest> requestsToSend = new ArrayList<ArpRequest>(requests.size());
+		synchronized (arpRequests) {
+			Iterator<ArpRequest> it = requests.iterator();
+			while (it.hasNext()) {
+				ArpRequest request = it.next();
+				it.remove();
+				requestsToSend.add(request);
+			}
+		}
+		
+		IDeviceObject deviceObject = deviceStorage.getDeviceByIP(
+				InetAddresses.coerceToInteger(address));
+		
+		MACAddress mac = MACAddress.valueOf(deviceObject.getMACAddress());
+		
+		log.debug("Found {} at {} in network map", 
+				address.getHostAddress(), mac);
+		
+		//Don't hold an ARP lock while dispatching requests
+		for (ArpRequest request : requestsToSend) {
+			request.dispatchReply(address, mac);
+		}
 	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java
index eeb307f..1355fe0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java
@@ -3,12 +3,35 @@
 import java.util.ArrayList;
 import java.util.TreeMap;
 
-import com.esotericsoftware.kryo2.Kryo;
-
 import net.floodlightcontroller.util.MACAddress;
-
-import net.onrc.onos.ofcontroller.util.*;
+import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
 import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.CallerId;
+import net.onrc.onos.ofcontroller.util.DataPath;
+import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction;
+import net.onrc.onos.ofcontroller.util.FlowEntryActions;
+import net.onrc.onos.ofcontroller.util.FlowEntryErrorState;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
+import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
+import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.FlowPathFlags;
+import net.onrc.onos.ofcontroller.util.FlowPathType;
+import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.IPv4;
+import net.onrc.onos.ofcontroller.util.IPv4Net;
+import net.onrc.onos.ofcontroller.util.IPv6;
+import net.onrc.onos.ofcontroller.util.IPv6Net;
+import net.onrc.onos.ofcontroller.util.Port;
+import net.onrc.onos.ofcontroller.util.Switch;
+import net.onrc.onos.ofcontroller.util.SwitchPort;
+
+import com.esotericsoftware.kryo2.Kryo;
 
 /**
  * Class factory for allocating Kryo instances for
@@ -129,6 +152,9 @@
 	kryo.register(TopologyElement.class);
 	kryo.register(TopologyElement.Type.class);
 	kryo.register(TreeMap.class);
+	
+	//ARP message
+	kryo.register(ArpMessage.class);
 
 	return kryo;
     }
diff --git a/titan/gremlin.sh b/titan/gremlin.sh
index e4895b0..55354e3 100755
--- a/titan/gremlin.sh
+++ b/titan/gremlin.sh
@@ -4,9 +4,18 @@
     MVN="mvn"
 fi
 
+BASE_DIR=`dirname $0`
 ONOS_DIR="`dirname $0`/.."
 #CP=$( echo `dirname $0`/../lib/*.jar `dirname $0`/../lib/titan/*.jar . | sed 's/ /:/g')
-CP=`${MVN} -f ${ONOS_DIR}/pom.xml dependency:build-classpath -Dmdep.outputFile=/dev/stdout -l /dev/stderr`
+#CP=`${MVN} -f ${ONOS_DIR}/pom.xml dependency:build-classpath -Dmdep.outputFile=/dev/stdout -l /dev/stderr`
+
+# Use a python script to parse the classpath out of the .classpath file
+CP=`${BASE_DIR}/../scripts/parse-classpath.py`
+
+if [[ "$CP" == *"Error reading classpath file"* ]]; then
+    echo $CP
+    exit 1
+fi
 
 # Find Java 
 if [ "$JAVA_HOME" = "" ] ; then
@@ -36,7 +45,9 @@
   if [ "$1" = "-v" ]; then
     $JAVA $JAVA_OPTIONS -cp $CP:$CLASSPATH com.tinkerpop.gremlin.Version
   else
+    pushd $BASE_DIR >/dev/null
     $JAVA $JAVA_OPTIONS -cp $CP:$CLASSPATH com.thinkaurelius.titan.tinkerpop.gremlin.Console
+    popd >/dev/null
   fi
 fi
 
diff --git a/titan/listDevices b/titan/listDevices
new file mode 100644
index 0000000..ea57f41
--- /dev/null
+++ b/titan/listDevices
@@ -0,0 +1,2 @@
+g.stopTransaction(SUCCESS);
+g.V('type', 'device').map;
\ No newline at end of file
diff --git a/titan/open b/titan/open
index 4442c0f..6efe890 100644
--- a/titan/open
+++ b/titan/open
@@ -1 +1 @@
-g = TitanFactory.open('cassandra.local')
\ No newline at end of file
+g = TitanFactory.open('/tmp/cassandra.titan')
\ No newline at end of file