Squashed commit of the following:
commit 2c41cec9145674b3bd48556c42d329c8ed1ae614
Merge: faea0fb e96a4eb
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 21:28:52 2014 -0800
Merge branch 'dbperf2' into mergetrial
Conflicts:
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
commit e96a4ebff6f4a67693da61fcbec9cc18a940839e
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 17:47:02 2014 -0800
Modified API call counter.
commit cd0dceecc7cc6df51c7e0b85bff1a72cae5f15cc
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 15:41:57 2014 -0800
Added counter.
commit 93baeb6bf14124520dfcf4efe769018c7c80a0ce
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 15:13:13 2014 -0800
Added counter of DB access.
commit a460355ed57a67b69109855f57e26ef830754bfa
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 13:51:35 2014 -0800
Fixed bugs.
commit 19493d2d769568a798c2619d4fc70fa0d43a44ac
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 12:15:11 2014 -0800
Fixed measurment point
commit ffc64a6f4302dd955acc47b14c12fedd10c90ffc
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 11:54:08 2014 -0800
Modified measuring points.
commit 819e010f6131e5d00a689ea9848d38b4a554e1bd
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 11:05:20 2014 -0800
Interted measurement code.
commit 887d7c0884b84bc0b2c0e2cfdf772468161959fc
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 01:01:27 2014 -0800
Fixed a bug that ports are not read to memory.
commit f490f5a0677ceedcc0ef5b9e43383464b67b11be
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Thu Jan 9 00:20:28 2014 -0800
Fixed a bug that Link addition failed.
commit 8d12e8918a1fa80947d72d312f12cf132ca13335
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Wed Jan 8 21:53:45 2014 -0800
Merged Pankaj's refactoring code.
commit d3e8ad2b397cc5898a8218b6156721ab5c7df7bf
Author: Naoki Shiota <n-shiota@onlab.us>
Date: Tue Jan 7 11:36:48 2014 -0800
Added periodic execution of readFromDatabase
diff --git a/src/main/java/net/onrc/onos/graph/GraphDBOperation.java b/src/main/java/net/onrc/onos/graph/GraphDBOperation.java
index ab775b9..26e6181 100644
--- a/src/main/java/net/onrc/onos/graph/GraphDBOperation.java
+++ b/src/main/java/net/onrc/onos/graph/GraphDBOperation.java
@@ -83,7 +83,17 @@
}
return null;
}
-
+
+ /**
+ * Get all switch objects.
+ */
+ @Override
+ public Iterable<IPortObject> getAllPorts() {
+ FramedGraph<TitanGraph> fg = conn.getFramedGraph();
+ Iterable<IPortObject> ports = fg.getVertices("type","port",IPortObject.class);
+ return ports;
+ }
+
/**
* Get all switch objects.
*/
@@ -154,7 +164,7 @@
IPortObject obj = fg.addVertex(null,IPortObject.class);
if (obj != null) {
obj.setType("port");
- String id = dpid + portNumber.toString();
+ String id = dpid + PORT_ID_DELIM + portNumber.toString();
obj.setPortId(id);
obj.setNumber(portNumber);
}
@@ -187,7 +197,7 @@
public IPortObject searchPort(String dpid, Short number) {
FramedGraph<TitanGraph> fg = conn.getFramedGraph();
if ( fg == null ) return null;
- String id = dpid + number.toString();
+ String id = dpid + IDBOperation.PORT_ID_DELIM + number.toString();
Iterator<IPortObject> ports = fg.getVertices("port_id",id,IPortObject.class).iterator();
if ( ports.hasNext() ) {
return ports.next();
diff --git a/src/main/java/net/onrc/onos/graph/IDBOperation.java b/src/main/java/net/onrc/onos/graph/IDBOperation.java
index f873f27..33d01fb 100644
--- a/src/main/java/net/onrc/onos/graph/IDBOperation.java
+++ b/src/main/java/net/onrc/onos/graph/IDBOperation.java
@@ -9,6 +9,8 @@
import net.onrc.onos.ofcontroller.util.FlowId;
public interface IDBOperation {
+ public static final String PORT_ID_DELIM = "@";
+
public ISwitchObject newSwitch(String dpid);
public ISwitchObject searchSwitch(String dpid);
public ISwitchObject searchActiveSwitch(String dpid);
@@ -44,5 +46,5 @@
public void commit();
public void rollback();
public void close();
-
+ Iterable<IPortObject> getAllPorts();
}
diff --git a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
index 932d422..c005304 100644
--- a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
+++ b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
@@ -1,5 +1,7 @@
package net.onrc.onos.graph;
+import java.util.Map;
+
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
import org.slf4j.Logger;
@@ -34,6 +36,7 @@
}
@Override
+ //public void edgeRemoved(Edge e, Map<String, Object> arg1) {
public void edgeRemoved(Edge e) {
// TODO Auto-generated method stub
// Fire NetMapEvents (LinkRemoved, FlowEntryRemoved, HostRemoved, PortRemoved)
@@ -72,6 +75,7 @@
}
@Override
+ //public void vertexRemoved(Vertex vertex, Map<String, Object> arg1) {
public void vertexRemoved(Vertex vertex) {
// TODO Auto-generated method stub
// Generate NetMapEvents
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
index 60f8e10..16220b9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
@@ -482,6 +482,8 @@
log.debug("LinkStorageImpl:addLinkImpl failed link exists {} {} src {} dst {}",
new Object[]{op, lt, vportSrc, vportDst});
}
+ } else {
+ log.error("Ports not found : {}", lt);
}
return success;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 83f9731..3dc80c2 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -8,6 +8,8 @@
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -56,6 +58,7 @@
private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
private GraphDBOperation dbHandler;
+
private FlowManager flowManager; // The Flow Manager to use
private IDatagridService datagridService; // The Datagrid Service to use
private Topology topology; // The network topology
@@ -717,10 +720,14 @@
switch (eventEntry.eventType()) {
case ENTRY_ADD:
- isTopologyModified |= topology.addTopologyElement(topologyElement);
+ synchronized (topology) {
+ isTopologyModified |= topology.addTopologyElement(topologyElement);
+ }
break;
case ENTRY_REMOVE:
- isTopologyModified |= topology.removeTopologyElement(topologyElement);
+ synchronized (topology) {
+ isTopologyModified |= topology.removeTopologyElement(topologyElement);
+ }
break;
}
}
@@ -1017,8 +1024,12 @@
DataPath oldDataPath = flowPath.dataPath();
// Compute the new path
- DataPath newDataPath = TopologyManager.computeNetworkPath(topology,
+ DataPath newDataPath;
+ synchronized (topology) {
+ newDataPath = TopologyManager.computeNetworkPath(topology,
flowPath);
+ }
+
if (newDataPath == null) {
// We need the DataPath to compare the paths
newDataPath = new DataPath();
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 740e782..ab2b31d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -242,12 +242,15 @@
*
*/
class FlowEntryWrapper {
- FlowEntryId flowEntryId;
- OFFlowStatisticsReply statisticsReply;
+ FlowEntryId flowEntryId;
+ IFlowEntry iFlowEntry;
+ OFFlowStatisticsReply statisticsReply;
+
public FlowEntryWrapper(IFlowEntry entry) {
flowEntryId = new FlowEntryId(entry.getFlowEntryId());
- }
+ iFlowEntry = entry;
+ }
public FlowEntryWrapper(OFFlowStatisticsReply entry) {
flowEntryId = new FlowEntryId(entry.getCookie());
@@ -268,18 +271,14 @@
double startDB = System.nanoTime();
// Get the Flow Entry state from the Network Graph
- IFlowEntry iFlowEntry = null;
- try {
- iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
- } catch (Exception e) {
- log.error("Error finding flow entry {} in Network Graph",
- flowEntryId);
- return;
- }
if (iFlowEntry == null) {
- log.error("Cannot add flow entry {} to sw {} : flow entry not found",
- flowEntryId, sw.getId());
- return;
+ try {
+ iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
+ } catch (Exception e) {
+ log.error("Error finding flow entry {} in Network Graph",
+ flowEntryId);
+ return;
+ }
}
dbTime = System.nanoTime() - startDB;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
index 20c85ab..d1f2af4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
@@ -6,10 +6,14 @@
import java.util.TreeMap;
import net.onrc.onos.graph.GraphDBOperation;
+import net.onrc.onos.graph.IDBOperation;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
import net.onrc.onos.ofcontroller.core.ISwitchStorage.SwitchState;
import org.openflow.util.HexString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Vertex;
@@ -24,36 +28,37 @@
* paths.
*/
class Link {
- public Node me; // The node this link originates from
- public Node neighbor; // The neighbor node on the other side
- public int myPort; // Local port ID for the link
- public int neighborPort; // Neighbor port ID for the link
+ public Node me; // The node this link originates from
+ public Node neighbor; // The neighbor node on the other side
+ public int myPort; // Local port ID for the link
+ public int neighborPort; // Neighbor port ID for the link
- /**
- * Link constructor.
- *
- * @param me the node this link originates from.
- * @param the neighbor node on the other side of the link.
- * @param myPort local port ID for the link.
- * @param neighborPort neighbor port ID for the link.
- */
- public Link(Node me, Node neighbor, int myPort, int neighborPort) {
- this.me = me;
- this.neighbor = neighbor;
- this.myPort = myPort;
- this.neighborPort = neighborPort;
- }
+ /**
+ * Link constructor.
+ *
+ * @param me the node this link originates from.
+ * @param the neighbor node on the other side of the link.
+ * @param myPort local port ID for the link.
+ * @param neighborPort neighbor port ID for the link.
+ */
+ public Link(Node me, Node neighbor, int myPort, int neighborPort) {
+ this.me = me;
+ this.neighbor = neighbor;
+ this.myPort = myPort;
+ this.neighborPort = neighborPort;
+ }
};
public long nodeId; // The node ID
+ // TODO Change type of PortNumber to Short
public TreeMap<Integer, Link> links; // The links from this node:
- // (src PortID -> Link)
+ // (src PortNumber -> Link)
private TreeMap<Integer, Link> reverseLinksMap; // The links to this node:
- // (dst PortID -> Link)
+ // (dst PortNumber -> Link)
private TreeMap<Integer, Integer> portsMap; // The ports on this node:
- // (PortID -> PortID)
+ // (PortNumber -> PortNumber)
// TODO: In the future will be:
- // (PortID -> Port)
+ // (PortNumber -> Port)
/**
* Node constructor.
@@ -187,6 +192,11 @@
* A class for storing topology information.
*/
public class Topology {
+ private final static Logger log = LoggerFactory.getLogger(Topology.class);
+
+ // flag to use optimized readFromDatabase() method.
+ private static final boolean enableOptimizedRead = false;
+
private Map<Long, Node> nodesMap; // The dpid->Node mapping
/**
@@ -383,131 +393,171 @@
* @param dbHandler the Graph Database handler to use.
*/
public void readFromDatabase(GraphDBOperation dbHandler) {
- //
- // Fetch the relevant info from the Switch and Port vertices
- // from the Titan Graph.
- //
- nodesMap = new TreeMap<Long,Node>();
-
- Iterable<ISwitchObject> activeSwitches = dbHandler.getActiveSwitches();
- for (ISwitchObject switchObj : activeSwitches) {
- Vertex nodeVertex = switchObj.asVertex();
- //
- // The Switch info
- //
- String nodeDpid = nodeVertex.getProperty("dpid").toString();
- long nodeId = HexString.toLong(nodeDpid);
- Node me = nodesMap.get(nodeId);
- if (me == null)
- me = addNode(nodeId);
+ if (enableOptimizedRead) {
+ readFromDatabaseBodyOptimized(dbHandler);
+ } else {
+ readFromDatabaseBody(dbHandler);
+ }
- //
- // The local Port info
- //
- for (Vertex myPortVertex : nodeVertex.getVertices(Direction.OUT, "on")) {
- // Ignore inactive ports
- if (! myPortVertex.getProperty("state").toString().equals("ACTIVE"))
- continue;
-
- int myPort = 0;
- Object obj = myPortVertex.getProperty("number");
- if (obj instanceof Short) {
- myPort = (Short)obj;
- } else if (obj instanceof Integer) {
- myPort = (Integer)obj;
- }
- me.addPort(myPort);
-
- //
- // The neighbor Port info
- //
- for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
- // Ignore inactive ports
- if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE"))
- continue;
-
- int neighborPort = 0;
- obj = neighborPortVertex.getProperty("number");
- if (obj instanceof Short) {
- neighborPort = (Short)obj;
- } else if (obj instanceof Integer) {
- neighborPort = (Integer)obj;
- }
- //
- // The neighbor Switch info
- //
- for (Vertex neighborVertex : neighborPortVertex.getVertices(Direction.IN, "on")) {
- // Ignore inactive switches
- String state = neighborVertex.getProperty("state").toString();
- if (! state.equals(SwitchState.ACTIVE.toString()))
- continue;
-
- String neighborDpid = neighborVertex.getProperty("dpid").toString();
- long neighborId = HexString.toLong(neighborDpid);
- Node neighbor = nodesMap.get(neighborId);
- if (neighbor == null)
- neighbor = addNode(neighborId);
- neighbor.addPort(neighborPort);
- me.addLink(myPort, neighbor, neighborPort);
- }
- }
- }
- }
- dbHandler.commit();
}
- // TODO Merge into loops in readFromDatabase() can reduce execution time.
- /**
- * Check given two topology are identical or not.
- * @param topo1
- * @param topo2
- * @return true if identical
- */
- private boolean compareTopology(Map<Long,Node> topo1, Map<Long,Node> topo2) {
- if (topo1.size() != topo2.size()) {
- return false;
+ private void readFromDatabaseBody(GraphDBOperation dbHandler) {
+ //
+ // Fetch the relevant info from the Switch and Port vertices
+ // from the Titan Graph.
+ //
+
+ nodesMap.clear();
+ Iterable<ISwitchObject> activeSwitches = dbHandler.getActiveSwitches();
+ for (ISwitchObject switchObj : activeSwitches) {
+ Vertex nodeVertex = switchObj.asVertex();
+ //
+ // The Switch info
+ //
+ String nodeDpid = nodeVertex.getProperty("dpid").toString();
+ long nodeId = HexString.toLong(nodeDpid);
+ Node me = nodesMap.get(nodeId);
+ if (me == null)
+ me = addNode(nodeId);
+
+ //
+ // The local Port info
+ //
+ for (Vertex myPortVertex : nodeVertex.getVertices(Direction.OUT, "on")) {
+ // Ignore inactive ports
+ if (! myPortVertex.getProperty("state").toString().equals("ACTIVE"))
+ continue;
+
+ int myPort = 0;
+ Object obj = myPortVertex.getProperty("number");
+ if (obj instanceof Short) {
+ myPort = (Short)obj;
+ } else if (obj instanceof Integer) {
+ myPort = (Integer)obj;
+ }
+ me.addPort(myPort);
+
+ for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+ // Ignore inactive ports
+ if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+ continue;
+ }
+
+ int neighborPort = 0;
+ obj = neighborPortVertex.getProperty("number");
+ if (obj instanceof Short) {
+ neighborPort = (Short)obj;
+ } else if (obj instanceof Integer) {
+ neighborPort = (Integer)obj;
+ }
+ //
+ // The neighbor Switch info
+ //
+ for (Vertex neighborVertex : neighborPortVertex.getVertices(Direction.IN, "on")) {
+ // Ignore inactive switches
+ String state = neighborVertex.getProperty("state").toString();
+ if (! state.equals(SwitchState.ACTIVE.toString()))
+ continue;
+
+ String neighborDpid = neighborVertex.getProperty("dpid").toString();
+ long neighborId = HexString.toLong(neighborDpid);
+ Node neighbor = nodesMap.get(neighborId);
+ if (neighbor == null)
+ neighbor = addNode(neighborId);
+ neighbor.addPort(neighborPort);
+ me.addLink(myPort, neighbor, neighborPort);
+ }
+ }
+ }
}
-
- for (Map.Entry<Long,Node> nodeEntry : topo1.entrySet()) {
- Long dpid = nodeEntry.getKey();
- if (! topo2.containsKey(dpid)) {
- return false;
- }
-
- Node n1 = nodeEntry.getValue();
- Node n2 = topo2.get(dpid);
-
- // check port identity
- if (n1.ports().size() != n2.ports().size()) {
- return false;
- }
- for (Integer port : n1.ports().keySet()) {
- if (! n2.ports().containsKey(port)) {
- return false;
- }
- }
-
- // check link identity
- if (n1.links.size() != n2.links.size()) {
- return false;
- }
- for (Map.Entry<Integer, Node.Link> linkEntry : n1.links.entrySet()) {
- Integer p1 = linkEntry.getKey();
- Node.Link l1 = linkEntry.getValue();
-
- if (! n2.links.containsKey(p1)) {
- return false;
- }
- Node.Link l2 = n2.links.get(p1);
-
- // Supposition: Link's "me" and "neighbor" is properly set.
- if (l1.myPort != l2.myPort ||
- l1.neighborPort != l2.neighborPort) {
- return false;
- }
- }
- }
- return true;
+ dbHandler.commit();
+ }
+
+ private void readFromDatabaseBodyOptimized(GraphDBOperation dbHandler) {
+ nodesMap.clear();
+
+ // Load all switches into Map
+ Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
+ for (ISwitchObject switchObj : switches) {
+ // Ignore inactive ports
+ if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
+ continue;
+ }
+ Vertex nodeVertex = switchObj.asVertex();
+ //
+ // The Switch info
+ //
+ String nodeDpid = nodeVertex.getProperty("dpid").toString();
+ long nodeId = HexString.toLong(nodeDpid);
+ addNode(nodeId);
+ }
+
+ //
+ // Get All Ports
+ //
+ Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
+ for (IPortObject myPortObj : ports) {
+ Vertex myPortVertex = myPortObj.asVertex();
+
+ // Ignore inactive ports
+ if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+ continue;
+ }
+
+ short myPort = 0;
+ String idStr = myPortObj.getPortId();
+ String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+ if (splitter.length != 2) {
+ log.error("Invalid port_id : {}", idStr);
+ continue;
+ }
+ String myDpid = splitter[0];
+ myPort = Short.parseShort(splitter[1]);
+ long myId = HexString.toLong(myDpid);
+ Node me = nodesMap.get(myId);
+
+ if (me == null) {
+ // cannot proceed ports and switches are out of sync
+ //TODO: Restart the whole read
+ continue;
+ }
+
+ if (me.getPort((int)myPort) == null) {
+ me.addPort((int)myPort);
+ } else if (me.getLink((int)myPort) != null) {
+ // Link already added..probably by neighbor
+ continue;
+ }
+
+ //
+ // The neighbor Port info
+ //
+ for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+ // Ignore inactive ports
+ if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+ continue;
+ }
+ int neighborPort = 0;
+ idStr = neighborPortVertex.getProperty("port_id").toString();
+ splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+ if (splitter.length != 2) {
+ log.error("Invalid port_id : {}", idStr);
+ continue;
+ }
+ String neighborDpid = splitter[0];
+ neighborPort = Short.parseShort(splitter[1]);
+ long neighborId = HexString.toLong(neighborDpid);
+ Node neighbor = nodesMap.get(neighborId);
+ if (neighbor == null) {
+ continue;
+ }
+ if (neighbor.getPort(neighborPort) == null) {
+ neighbor.addPort(neighborPort);
+ }
+ me.addLink(myPort, neighbor, neighborPort);
+ }
+ }
+ dbHandler.commit();
}
// Only for debug use