Merge pull request #19 from effy/RAMCloud

batch flow path set properties
diff --git a/src/main/java/net/onrc/onos/graph/DBOperation.java b/src/main/java/net/onrc/onos/graph/DBOperation.java
index 730f727..6e278d9 100644
--- a/src/main/java/net/onrc/onos/graph/DBOperation.java
+++ b/src/main/java/net/onrc/onos/graph/DBOperation.java
@@ -27,7 +27,6 @@
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
 import net.onrc.onos.ofcontroller.core.ISwitchStorage;
-import net.onrc.onos.ofcontroller.flowmanager.FlowDatabaseOperation;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
 import net.onrc.onos.ofcontroller.util.FlowId;
 
@@ -67,6 +66,15 @@
 	}
 
 	/**
+	 * Get all port objects.
+	 */
+	@Override
+	public Iterable<IPortObject> getAllPorts() {
+	    Iterable<IPortObject> ports = conn.getFramedGraph().getVertices("type", "port", IPortObject.class);
+	    return ports;
+	}
+
+	/**
 	 * Get all switch objects.
 	 */
 	@Override
@@ -119,7 +127,7 @@
 	    IPortObject obj = (IPortObject) conn.getFramedGraph().addVertex(null, IPortObject.class);
 	    if (obj != null) {
 		obj.setType("port");
-		String id = dpid + portNum.toString();
+		String id = dpid + PORT_ID_DELIM + portNum.toString();
 		obj.setPortId(id);
 		obj.setNumber(portNum);
 	    }
@@ -151,7 +159,7 @@
 	public IPortObject searchPort(String dpid, Short number) {
 	    FramedGraph fg = conn.getFramedGraph();
 	    if ( fg == null ) return null;
-	    String id = dpid + number.toString();
+	    String id = dpid + PORT_ID_DELIM + number.toString();
 	    Iterator<IPortObject> it = fg.getVertices("port_id", id, IPortObject.class).iterator();
 	    return (it.hasNext()) ? it.next() : null;
 
diff --git a/src/main/java/net/onrc/onos/graph/IDBOperation.java b/src/main/java/net/onrc/onos/graph/IDBOperation.java
index cb67bf8..d2e8109 100644
--- a/src/main/java/net/onrc/onos/graph/IDBOperation.java
+++ b/src/main/java/net/onrc/onos/graph/IDBOperation.java
@@ -13,6 +13,8 @@
 import net.onrc.onos.ofcontroller.util.FlowId;
 
 public interface IDBOperation {
+	public static final String PORT_ID_DELIM = "@";
+
 	public ISwitchObject newSwitch(String dpid);
 	public ISwitchObject searchSwitch(String dpid);
 	public ISwitchObject searchActiveSwitch(String dpid);
@@ -21,13 +23,13 @@
 	public Iterable<ISwitchObject> getInactiveSwitches();
 	public Iterable<IFlowEntry> getAllSwitchNotUpdatedFlowEntries();
 	public void removeSwitch(ISwitchObject sw);
-	
+
 	@Deprecated
 	public IPortObject newPort(Short portNumber);
 	public IPortObject newPort(String dpid, Short portNum);
 	public IPortObject searchPort(String dpid, Short number);
 	public void removePort(IPortObject port);
-	
+
 	public IDeviceObject newDevice();
 	public IDeviceObject searchDevice(String macAddr);
 	public Iterable<IDeviceObject> getDevices();
@@ -46,9 +48,10 @@
 
 	public void setVertexProperties(Vertex vertex, Map<String, Object> map);
 
-	public IDBConnection getDBConnection();	
+	public IDBConnection getDBConnection();
 	public void commit();
 	public void rollback();
 	public void close();
-	
+
+	public Iterable<IPortObject> getAllPorts();
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index f0441b5..001fb3c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -51,11 +51,11 @@
     private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
 
     // Flag to refresh Topology object periodically
-    private final static boolean refreshTopology = true;
+    private final static boolean refreshTopology = false;
     // Refresh delay(ms)
     private final static long refreshTopologyDelay = 5000;
     // Refresh interval(ms)
-    private final static long refreshTopologyInterval = 5000;
+    private final static long refreshTopologyInterval = 2000;
     private Timer refreshTopologyTimer;
 
     private FlowManager flowManager;		// The Flow Manager to use
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
index 98a517c..1674cf0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
@@ -1,15 +1,21 @@
 package net.onrc.onos.ofcontroller.topology;
 
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.TreeMap;
 
 import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.graph.IDBOperation;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
 import net.onrc.onos.ofcontroller.core.ISwitchStorage.SwitchState;
 
+import org.apache.commons.lang.StringUtils;
 import org.openflow.util.HexString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Vertex;
@@ -24,36 +30,37 @@
      * paths.
      */
     class Link {
-	public Node me;			// The node this link originates from
-	public Node neighbor;		// The neighbor node on the other side
-	public int myPort;		// Local port ID for the link
-	public int neighborPort;	// Neighbor port ID for the link
+        public Node me;                        // The node this link originates from
+        public Node neighbor;                // The neighbor node on the other side
+        public int myPort;                // Local port ID for the link
+        public int neighborPort;        // Neighbor port ID for the link
 
-	/**
-	 * Link constructor.
-	 *
-	 * @param me the node this link originates from.
-	 * @param the neighbor node on the other side of the link.
-	 * @param myPort local port ID for the link.
-	 * @param neighborPort neighbor port ID for the link.
-	 */
-	public Link(Node me, Node neighbor, int myPort, int neighborPort) {
-	    this.me = me;
-	    this.neighbor = neighbor;
-	    this.myPort = myPort;
-	    this.neighborPort = neighborPort;
-	}
+        /**
+         * Link constructor.
+         *
+         * @param me the node this link originates from.
+         * @param the neighbor node on the other side of the link.
+         * @param myPort local port ID for the link.
+         * @param neighborPort neighbor port ID for the link.
+         */
+        public Link(Node me, Node neighbor, int myPort, int neighborPort) {
+        	this.me = me;
+        	this.neighbor = neighbor;
+        	this.myPort = myPort;
+        	this.neighborPort = neighborPort;
+        }
     };
 
     public long nodeId;				// The node ID
+    // TODO Change type of PortNumber to Short
     public TreeMap<Integer, Link> links;	// The links from this node:
-						//     (src PortID -> Link)
+						//     (src PortNumber -> Link)
     private TreeMap<Integer, Link> reverseLinksMap; // The links to this node:
-						//     (dst PortID -> Link)
+						//     (dst PortNumber -> Link)
     private TreeMap<Integer, Integer> portsMap;	// The ports on this node:
-						//     (PortID -> PortID)
+						//     (PortNumber -> PortNumber)
 						// TODO: In the future will be:
-						//     (PortID -> Port)
+						//     (PortNumber -> Port)
 
     /**
      * Node constructor.
@@ -187,6 +194,8 @@
  * A class for storing topology information.
  */
 public class Topology {
+    private final static Logger log = LoggerFactory.getLogger(Topology.class);
+
     private Map<Long, Node> nodesMap;	// The dpid->Node mapping
 
     /**
@@ -383,73 +392,287 @@
      * @param dbHandler the Graph Database handler to use.
      */
     public void readFromDatabase(DBOperation dbHandler) {
-	//
-	// Fetch the relevant info from the Switch and Port vertices
-	// from the Titan Graph.
-	//
-	Iterable<ISwitchObject> activeSwitches = dbHandler.getActiveSwitches();
-	for (ISwitchObject switchObj : activeSwitches) {
-	    Vertex nodeVertex = switchObj.asVertex();
-	    //
-	    // The Switch info
-	    //
-	    String nodeDpid = nodeVertex.getProperty("dpid").toString();
-	    long nodeId = HexString.toLong(nodeDpid);
-	    Node me = nodesMap.get(nodeId);
-	    if (me == null)
-		me = addNode(nodeId);
+		//
+		// Fetch the relevant info from the Switch and Port vertices
+		// from the Titan Graph.
+		//
+    	nodesMap.clear();
 
-	    //
-	    // The local Port info
-	    //
-	    for (Vertex myPortVertex : nodeVertex.getVertices(Direction.OUT, "on")) {
-		// Ignore inactive ports
-		if (! myPortVertex.getProperty("state").toString().equals("ACTIVE"))
-		    continue;
+        // Load all switches into Map
+        Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
+        for (ISwitchObject switchObj : switches) {
+        	// Ignore inactive ports
+            if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
+            	continue;
+            }
+            Vertex nodeVertex = switchObj.asVertex();
+            //
+            // The Switch info
+            //
+            String nodeDpid = nodeVertex.getProperty("dpid").toString();
+            long nodeId = HexString.toLong(nodeDpid);
+            addNode(nodeId);
+        }
 
-		int myPort = 0;
-		Object obj = myPortVertex.getProperty("number");
-		if (obj instanceof Short) {
-		    myPort = (Short)obj;
-		} else if (obj instanceof Integer) {
-		    myPort = (Integer)obj;
-		}
+        //
+        // Get All Ports
+        //
+        Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
+        for (IPortObject myPortObj : ports) {
+            Vertex myPortVertex = myPortObj.asVertex();
+
+            // Ignore inactive ports
+            if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+            	continue;
+            }
+
+            short myPort = 0;
+            String idStr = myPortObj.getPortId();
+            String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+            if (splitter.length != 2) {
+            	log.error("Invalid port_id : {}", idStr);
+            	continue;
+            }
+            String myDpid = splitter[0];
+            myPort = Short.parseShort(splitter[1]);
+            long myId = HexString.toLong(myDpid);
+            Node me = nodesMap.get(myId);
+
+            if (me == null) {
+                // cannot proceed ports and switches are out of sync
+                //TODO: Restart the whole read
+                continue;
+            }
+
+            if (me.getPort(myPort) == null) {
+            	me.addPort(myPort);
+            } else if (me.getLink(myPort) != null) {
+                // Link already added..probably by neighbor
+                continue;
+            }
+
+            //
+            // The neighbor Port info
+            //
+            for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+//            	log.debug("state : {}", neighborPortVertex.getProperty("state"));
+//            	log.debug("port id : {}", neighborPortVertex.getProperty("port_id"));
+                // Ignore inactive ports
+                if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+                	continue;
+                }
+                int neighborPort = 0;
+                idStr = neighborPortVertex.getProperty("port_id").toString();
+                splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+                if (splitter.length != 2) {
+                	log.error("Invalid port_id : {}", idStr);
+                	continue;
+                }
+                String neighborDpid = splitter[0];
+                neighborPort = Short.parseShort(splitter[1]);
+                long neighborId = HexString.toLong(neighborDpid);
+                Node neighbor = nodesMap.get(neighborId);
+//                log.debug("dpid {},{}  port {}", neighborDpid, neighborId, neighborPort);
+                if (neighbor == null) {
+                	continue;
+                }
+                me.addLink(myPort, neighbor, neighborPort);
+            }
+        }
+        dbHandler.commit();
+    }
+
+
+    // Only for debug use
+    List<Long> logGetSw = new ArrayList<Long>(100);
+    List<Long> logGetPt = new ArrayList<Long>(100);
+    List<Long> logAddSw = new ArrayList<Long>(100);
+    List<Long> logAddPt = new ArrayList<Long>(100);
+    List<Long> logAddLk = new ArrayList<Long>(100);
+    List<Long> logCommit = new ArrayList<Long>(100);
+    List<Integer> logGetVertices = new ArrayList<Integer>(100);
+    List<Integer> logGetProperty = new ArrayList<Integer>(100);
+       public void readFromDatabaseBreakdown(DBOperation dbHandler) {
+    	int getVerticesCount = 0;
+    	int getPropertyCount = 0;
+    	int getVCount_sw = 0;
+    	int getVCount_pt = 0;
+    	int getVCount_lk = 0;
+    	int getPCount_sw = 0;
+    	int getPCount_pt = 0;
+    	int getPCount_lk = 0;
 
 		//
-		// The neighbor Port info
+		// Fetch the relevant info from the Switch and Port vertices
+		// from the Titan Graph.
 		//
-		for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
-		    // Ignore inactive ports
-		    if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE"))
-			continue;
 
-		    int neighborPort = 0;
-		    obj = neighborPortVertex.getProperty("number");
-		    if (obj instanceof Short) {
-			neighborPort = (Short)obj;
-		    } else if (obj instanceof Integer) {
-			neighborPort = (Integer)obj;
-		    }
-		    //
-		    // The neighbor Switch info
-		    //
-		    for (Vertex neighborVertex : neighborPortVertex.getVertices(Direction.IN, "on")) {
-			// Ignore inactive switches
-			String state = neighborVertex.getProperty("state").toString();
-			if (! state.equals(SwitchState.ACTIVE.toString()))
-			    continue;
+    	nodesMap.clear();
+    	long t1 = System.nanoTime();
 
-			String neighborDpid = neighborVertex.getProperty("dpid").toString();
-			long neighborId = HexString.toLong(neighborDpid);
-			Node neighbor = nodesMap.get(neighborId);
-			if (neighbor == null)
-			    neighbor = addNode(neighborId);
-			me.addLink(myPort, neighbor, neighborPort);
-		    }
-		}
-	    }
-	}
-	dbHandler.commit();
+        // Load all switches into Map
+        Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
+
+        long t2 = System.nanoTime();
+
+        long t_addSw = 0;
+        for (ISwitchObject switchObj : switches) {
+            long t3 = System.nanoTime();
+            long t4;
+
+        	// Ignore inactive ports
+            ++getPropertyCount;
+            ++getPCount_sw;
+            if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
+                t4 = System.nanoTime();
+                t_addSw += t4 - t3;
+            	continue;
+            }
+            Vertex nodeVertex = switchObj.asVertex();
+            //
+            // The Switch info
+            //
+            ++getPropertyCount;
+            ++getPCount_sw;
+            String nodeDpid = nodeVertex.getProperty("dpid").toString();
+            long nodeId = HexString.toLong(nodeDpid);
+            addNode(nodeId);
+            t4 = System.nanoTime();
+            t_addSw += t4 - t3;
+        }
+
+        long t5 = System.nanoTime();
+        //
+        // Get All Ports
+        //
+        Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
+
+        long t6 = System.nanoTime();
+        long t_addPort = 0;
+        long t_addLink = 0;
+
+        for (IPortObject myPortObj : ports) {
+            long t7 = System.nanoTime();
+            long t8;
+            Vertex myPortVertex = myPortObj.asVertex();
+
+            // Ignore inactive ports
+            ++getPropertyCount;
+            ++getPCount_pt;
+            if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+                t8 = System.nanoTime();
+                t_addPort += t8 - t7;
+            	continue;
+            }
+
+            short myPort = 0;
+            ++getPropertyCount;
+            ++getPCount_pt;
+            String idStr = myPortObj.getPortId();
+            String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+            if (splitter.length != 2) {
+            	log.error("Invalid port_id : {}", idStr);
+                t8 = System.nanoTime();
+                t_addPort += t8 - t7;
+            	continue;
+            }
+            String myDpid = splitter[0];
+            myPort = Short.parseShort(splitter[1]);
+            long myId = HexString.toLong(myDpid);
+            Node me = nodesMap.get(myId);
+
+            if (me == null) {
+                // cannot proceed ports and switches are out of sync
+                //TODO: Restart the whole read
+                t8 = System.nanoTime();
+                t_addPort += t8 - t7;
+                continue;
+            }
+
+            if (me.getPort(myPort) == null) {
+            	me.addPort(myPort);
+            } else if (me.getLink(myPort) != null) {
+                // Link already added..probably by neighbor
+                t8 = System.nanoTime();
+                t_addPort += t8 - t7;
+                continue;
+            }
+            t8 = System.nanoTime();
+            t_addPort += t8 - t7;
+
+            //
+            // The neighbor Port info
+            //
+            ++getVerticesCount;
+            ++getVCount_pt;
+            for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+//            	log.debug("state : {}", neighborPortVertex.getProperty("state"));
+//            	log.debug("port id : {}", neighborPortVertex.getProperty("port_id"));
+
+                long t9 = System.nanoTime();
+                long t10;
+
+                // Ignore inactive ports
+                ++getPropertyCount;
+                ++getPCount_lk;
+                if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+                    t10 = System.nanoTime();
+                    t_addLink += t10 - t9;
+                	continue;
+                }
+                int neighborPort = 0;
+                ++getPropertyCount;
+                ++getPCount_lk;
+                idStr = neighborPortVertex.getProperty("port_id").toString();
+                splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+                if (splitter.length != 2) {
+                	log.error("Invalid port_id : {}", idStr);
+                    t10 = System.nanoTime();
+                    t_addLink += t10 - t9;
+                	continue;
+                }
+                String neighborDpid = splitter[0];
+                neighborPort = Short.parseShort(splitter[1]);
+                long neighborId = HexString.toLong(neighborDpid);
+                Node neighbor = nodesMap.get(neighborId);
+//                log.debug("dpid {},{}  port {}", neighborDpid, neighborId, neighborPort);
+                if (neighbor == null) {
+                    t10 = System.nanoTime();
+                    t_addLink += t10 - t9;
+                	continue;
+                }
+                me.addLink(myPort, neighbor, neighborPort);
+
+                t10 = System.nanoTime();
+                t_addLink += t10 - t9;
+            }
+        }
+        long t11 = System.nanoTime();
+        dbHandler.commit();
+        long t12 = System.nanoTime();
+
+        logGetSw.add((t2-t1)/1000);
+        logGetPt.add((t6-t5)/1000);
+        logAddSw.add(t_addSw/1000);
+        logAddPt.add(t_addPort/1000);
+        logAddLk.add(t_addLink/1000);
+        logCommit.add((t12-t11)/1000);
+        logGetVertices.add(getVerticesCount);
+        logGetProperty.add(getPropertyCount);
+        log.debug("getVertices[N({}),P({}),L({})] getProperty[N({}),P({}),L({})]",
+        		new Object[]{getVCount_sw,getVCount_pt,getVCount_lk,
+        		getPCount_sw,getPCount_pt,getPCount_lk});
+    }
+
+    public void printMeasuredLog() {
+    	log.debug("getsw: {}", StringUtils.join(logGetSw, ","));
+    	log.debug("getpt: {}", StringUtils.join(logGetPt, ","));
+    	log.debug("addsw: {}", StringUtils.join(logAddSw, ","));
+    	log.debug("addpt: {}", StringUtils.join(logAddPt, ","));
+    	log.debug("addlk: {}", StringUtils.join(logAddLk, ","));
+    	log.debug("commit: {}", StringUtils.join(logCommit, ","));
+    	log.debug("getvertices: {}", StringUtils.join(logGetVertices, ","));
+    	log.debug("getproperty: {}", StringUtils.join(logGetProperty, ","));
     }
 
     // Only for debug use