Merge branch 'master' of https://github.com/OPENNETWORKINGLAB/ONOS
diff --git a/README.md b/README.md
index b71b9aa..9f2c040 100644
--- a/README.md
+++ b/README.md
@@ -15,12 +15,12 @@
0. Install custom jars and dependencies (needs to be run only once)
- $ ./setup-local-maven.sh
+ $ ./setup-local-maven.sh
1. Cleanly build ONOS
- $ mvn clean
- $ mvn compile
+ $ mvn clean
+ $ mvn compile
NOTE: installing maven for the first time may switch java version
from 1.7 to 1.6. This might prevent Cassandra to run.
@@ -28,17 +28,19 @@
Dependencies
------------
1. Zookeeper
+
Download and install apache-zookeeper-3.4.5:
http://zookeeper.apache.org/releases.html
- Edit file (ONOS-INSTALL-DIR)/start-zk.sh and set variable "ZK_DIR"
+ Edit file `(ONOS-INSTALL-DIR)/start-zk.sh` and set variable "ZK_DIR"
to point to the Zookeeper directory.
2. Cassandra
+
Download and install apache-cassandra-1.2.4:
http://cassandra.apache.org/download/
- Edit file (ONOS-INSTALL-DIR)/start-cassandra.sh and set variable
+ Edit file `(ONOS-INSTALL-DIR)/start-cassandra.sh` and set variable
"CASSANDRA_DIR" to point to the Cassandra directory.
Running ONOS with Cassandra as a separate process
@@ -47,62 +49,62 @@
1. Start Zookeeper
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-zk.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-zk.sh start
- ## Confirm Zookeeper is running:
- $ ./start.zk.sh status
+ ## Confirm Zookeeper is running:
+ $ ./start.zk.sh status
2. Start Cassandra
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-cassandra.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-cassandra.sh start
- ## Confirm Cassandra is running:
- $ ./start-cassandra.sh status
+ ## Confirm Cassandra is running:
+ $ ./start-cassandra.sh status
3. Start ONOS
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-onos.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-onos.sh start
- ## Confirm ONOS is running:
- $ ./start-onos.sh status
-
+ ## Confirm ONOS is running:
+ $ ./start-onos.sh status
+
4. Start ONOS REST API server
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-rest.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-rest.sh start
- ## Confirm the REST API server is running:
- $ ./start-rest.sh status
+ ## Confirm the REST API server is running:
+ $ ./start-rest.sh status
Running ONOS with Cassandra embedded (Optional)
-----------------------------------------------
1. Start Zookeeper
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-zk.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-zk.sh start
- ## Confirm Zookeeper is running:
- $ ./start.zk.sh status
-
+ ## Confirm Zookeeper is running:
+ $ ./start.zk.sh status
+
2. Start ONOS and Cassandra embedded
- $ cd (ONOS-INSTALL_DIR)/
- $ ./start-onos-embedded.sh start
+ $ cd (ONOS-INSTALL_DIR)/
+ $ ./start-onos-embedded.sh start
- ## Confirm ONOS is running:
- $ ./start-onos-embedded.sh status
-
+ ## Confirm ONOS is running:
+ $ ./start-onos-embedded.sh status
+
3. Start ONOS REST API server
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-rest.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-rest.sh start
- ## Confirm the REST API server is running:
- $ ./start-rest.sh status
+ ## Confirm the REST API server is running:
+ $ ./start-rest.sh status
Running in offline mode (Optional)
@@ -110,9 +112,9 @@
Maven is used to build and run ONOS. By default, maven tries to reach
the repositories. The '-o' option can be given to the 'mvn' command to
-suppress this behavior. The MVN environmental variable can be used to
+suppress this behavior. The `MVN` environmental variable can be used to
set additional options to the 'mvn' command used in ONOS.
* Example: Running in offline mode
- $ env MVN="mvn -o" ./start-onos.sh start
+ $ env MVN="mvn -o" ./start-onos.sh start
diff --git a/cluster-mgmt/bin/cho-link-failure.sh b/cluster-mgmt/bin/cho-link-failure.sh
index 6c5f128..4db887a 100755
--- a/cluster-mgmt/bin/cho-link-failure.sh
+++ b/cluster-mgmt/bin/cho-link-failure.sh
@@ -16,7 +16,6 @@
dsh -w ${basename}1 "cd ONOS/scripts; ./all-linkup.sh"
echo "clean up flow"
dsh -w ${basename}1 "cd ONOS/web; ./delete_flow.py 1 100"
-dsh -w ${basename}1 "cd ONOS/web; ./clear_flow.py 1 100"
sleep 1
dsh -w ${basename}1 "cd ONOS/web; ./get_flow.py all"
dsh "cd ONOS/scripts; ./delflow.sh"
diff --git a/cluster-mgmt/bin/demo-reset-hw.sh b/cluster-mgmt/bin/demo-reset-hw.sh
index 8c586f5..15f97e1 100755
--- a/cluster-mgmt/bin/demo-reset-hw.sh
+++ b/cluster-mgmt/bin/demo-reset-hw.sh
@@ -9,7 +9,6 @@
echo "cleanup excess flows"
$DIR/web/delete_flow.py 201 300
-$DIR/web/clear_flow.py 201 300
echo "cleanup excess flows done"
echo "Adding 200 flows"
$DIR/web/add_flow.py -m onos -f $DIR/web/flowdef_demo_start.txt
diff --git a/cluster-mgmt/bin/test-link-failure.sh b/cluster-mgmt/bin/test-link-failure.sh
index 6c5f128..4db887a 100755
--- a/cluster-mgmt/bin/test-link-failure.sh
+++ b/cluster-mgmt/bin/test-link-failure.sh
@@ -16,7 +16,6 @@
dsh -w ${basename}1 "cd ONOS/scripts; ./all-linkup.sh"
echo "clean up flow"
dsh -w ${basename}1 "cd ONOS/web; ./delete_flow.py 1 100"
-dsh -w ${basename}1 "cd ONOS/web; ./clear_flow.py 1 100"
sleep 1
dsh -w ${basename}1 "cd ONOS/web; ./get_flow.py all"
dsh "cd ONOS/scripts; ./delflow.sh"
diff --git a/cluster-mgmt/template/onsdemo_edge_template.py b/cluster-mgmt/template/onsdemo_edge_template.py
index e340f38..c3d0287 100755
--- a/cluster-mgmt/template/onsdemo_edge_template.py
+++ b/cluster-mgmt/template/onsdemo_edge_template.py
@@ -65,7 +65,7 @@
switch.append(sw)
for i in range (NR_NODES):
- host.append(self.addHost( 'host%d' % (int(i)+1) ))
+ host.append(self.addHost( 'host%d.%d' % (NWID, int(i)+1) ))
for i in range (NR_NODES):
self.addLink(host[i], switch[i])
@@ -117,7 +117,7 @@
host = []
for i in range (NR_NODES):
- host.append(net.get( 'host%d' % (int(i)+1) ))
+ host.append(net.get( 'host%d.%d' % (NWID, (int(i)+1)) ))
net.start()
diff --git a/kryo2/pom.xml b/kryo2/pom.xml
index 1beb87d..788f952 100644
--- a/kryo2/pom.xml
+++ b/kryo2/pom.xml
@@ -7,7 +7,7 @@
<version>2.22</version>
<packaging>jar</packaging>
- <name>kyro2</name>
+ <name>kryo2</name>
<url>http://maven.apache.org</url>
<properties>
diff --git a/scripts/demo-reset-sw.sh b/scripts/demo-reset-sw.sh
index 65a2ff1..e6cabae 100755
--- a/scripts/demo-reset-sw.sh
+++ b/scripts/demo-reset-sw.sh
@@ -6,7 +6,6 @@
$DIR/scripts/all-linkup.sh
echo "Delete Flows"
$DIR/web/delete_flow.py 201 300
-$DIR/web/clear_flow.py 201 300
echo "Adding Flows"
$DIR/web/add_flow.py -m onos -f $DIR/web/flowdef_demo_start.txt
ssh -i ~/.ssh/onlabkey.pem ${basename}5 'ONOS/start-onos.sh stop'
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 180cbe9..775f952 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -607,6 +607,29 @@
}
/**
+ * Get a Flow for a given Flow ID.
+ *
+ * @param flowId the Flow ID of the Flow to get.
+ * @return the Flow if found, otherwise null.
+ */
+ @Override
+ public FlowPath getFlow(FlowId flowId) {
+ byte[] valueBytes = mapFlow.get(flowId.value());
+ if (valueBytes == null)
+ return null;
+
+ Kryo kryo = kryoFactory.newKryo();
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ kryoFactory.deleteKryo(kryo);
+
+ return flowPath;
+ }
+
+ /**
* Send a notification that a Flow is added.
*
* @param flowPath the Flow that is added.
@@ -702,6 +725,29 @@
}
/**
+ * Get a Flow Entry for a given Flow Entry ID.
+ *
+ * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
+ * @return the Flow Entry if found, otherwise null.
+ */
+ @Override
+ public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
+ byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
+ if (valueBytes == null)
+ return null;
+
+ Kryo kryo = kryoFactory.newKryo();
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
+ kryoFactory.deleteKryo(kryo);
+
+ return flowEntry;
+ }
+
+ /**
* Send a notification that a FlowEntry is added.
*
* @param flowEntry the FlowEntry that is added.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 9361341..034fe25 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -50,7 +50,7 @@
* @param arpEventHandler The ARP event handler to de-register.
*/
public void deregisterArpEventHandler(IArpEventHandler arpEventHandler);
-
+
/**
* Get all Flows that are currently in the datagrid.
*
@@ -59,6 +59,14 @@
Collection<FlowPath> getAllFlows();
/**
+ * Get a Flow for a given Flow ID.
+ *
+ * @param flowId the Flow ID of the Flow to get.
+ * @return the Flow if found, otherwise null.
+ */
+ FlowPath getFlow(FlowId flowId);
+
+ /**
* Send a notification that a Flow is added.
*
* @param flowPath the Flow that is added.
@@ -92,6 +100,14 @@
Collection<FlowEntry> getAllFlowEntries();
/**
+ * Get a Flow Entry for a given Flow Entry ID.
+ *
+ * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
+ * @return the Flow Entry if found, otherwise null.
+ */
+ FlowEntry getFlowEntry(FlowEntryId flowEntryId);
+
+ /**
* Send a notification that a FlowEntry is added.
*
* @param flowEntry the FlowEntry that is added.
diff --git a/src/main/java/net/onrc/onos/flow/FlowManagerImpl.java b/src/main/java/net/onrc/onos/flow/FlowManagerImpl.java
deleted file mode 100644
index 9865deb..0000000
--- a/src/main/java/net/onrc/onos/flow/FlowManagerImpl.java
+++ /dev/null
@@ -1,331 +0,0 @@
-package net.onrc.onos.flow;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-
-import org.openflow.util.HexString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.tinkerpop.blueprints.Direction;
-import com.tinkerpop.blueprints.Vertex;
-
-import net.floodlightcontroller.core.IOFSwitch;
-import net.onrc.onos.graph.GraphDBOperation;
-import net.onrc.onos.graph.LocalTopologyEventListener;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
-import net.onrc.onos.ofcontroller.core.ISwitchStorage.SwitchState;
-import net.onrc.onos.ofcontroller.util.DataPath;
-import net.onrc.onos.ofcontroller.util.Dpid;
-import net.onrc.onos.ofcontroller.util.FlowEntry;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction;
-import net.onrc.onos.ofcontroller.util.FlowEntryActions;
-import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
-import net.onrc.onos.ofcontroller.util.FlowPath;
-import net.onrc.onos.ofcontroller.util.Port;
-import net.onrc.onos.ofcontroller.util.SwitchPort;
-
-public class FlowManagerImpl implements IFlowManager {
-
- protected final static Logger log = LoggerFactory.getLogger(LocalTopologyEventListener.class);
- protected GraphDBOperation op;
-
- @Override
- public void createFlow(IPortObject src_port, IPortObject dest_port) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public Iterable<FlowPath> getFlows(IPortObject src_port,
- IPortObject dest_port) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Iterable<FlowPath> getOutFlows(IPortObject port) {
- // TODO Auto-generated method stub
- List<FlowPath> flowPaths = new ArrayList<FlowPath> ();
- Iterable<IFlowEntry> flowEntries = port.getOutFlowEntries();
-
- for(IFlowEntry fe: flowEntries) {
- IFlowPath flow = fe.getFlow();
- FlowPath flowPath = new FlowPath(flow);
- flowPaths.add(flowPath);
- }
- return flowPaths;
- }
-
- @Override
- public void reconcileFlows(IPortObject src_port) {
- // TODO Auto-generated method stub
-
- log.debug("Reconcile Flows for Port removed: {}:{}",src_port.getSwitch().getDPID(),src_port.getNumber());
- Iterable<IFlowEntry> flowEntries = src_port.getOutFlowEntries();
-
- for(IFlowEntry fe: flowEntries) {
- IFlowPath flow = fe.getFlow();
- reconcileFlow(flow);
- }
- }
-
- private void reconcileFlow(IFlowPath flow) {
- // TODO Auto-generated method stub
- String src_dpid = flow.getSrcSwitch();
- String dst_dpid = flow.getDstSwitch();
- Short src_port = flow.getSrcPort();
- Short dst_port = flow.getDstPort();
- IPortObject src = null;
- IPortObject dst = null;
- src = op.searchPort(src_dpid, src_port);
- dst = op.searchPort(dst_dpid, dst_port);
- if (src != null && dst != null) {
- FlowPath newFlow = this.computeFlowPath(src,dst);
- installFlow(newFlow);
- removeFlow(flow);
- }
-
- }
-
- private void removeFlow(IFlowPath flow) {
- // TODO Auto-generated method stub
-
- }
-
- private void installFlow(FlowPath newFlow) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void reconcileFlow(IPortObject src_port, IPortObject dest_port) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public FlowPath computeFlowPath(IPortObject src_port, IPortObject dest_port) {
- // TODO Auto-generated method stub
- DataPath dataPath = new DataPath();
-
- // FIXME: Bad idea to use FloodLight data structures (SwitchPort)
-
- dataPath.setSrcPort(new SwitchPort(new Dpid(src_port.getSwitch().getDPID()),
- new Port(src_port.getNumber())));
- dataPath.setDstPort(new SwitchPort(new Dpid(src_port.getSwitch().getDPID()),
- new Port(src_port.getNumber())));
-
- if (src_port.getSwitch().equals(dest_port.getSwitch())) {
- // on same switch create quick path
- FlowEntry flowEntry = new FlowEntry();
- flowEntry.setDpid(new Dpid(src_port.getSwitch().getDPID()));
- flowEntry.setInPort(new Port(src_port.getNumber()));
- flowEntry.setOutPort(new Port(src_port.getNumber()));
- flowEntry.setFlowEntryMatch(new FlowEntryMatch());
- flowEntry.flowEntryMatch().enableInPort(flowEntry.inPort());
-
- // Set the outgoing port output action
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- FlowEntryAction flowEntryAction = new FlowEntryAction();
- flowEntryAction.setActionOutput(flowEntry.outPort());
- flowEntryActions.addAction(flowEntryAction);
- dataPath.flowEntries().add(flowEntry);
-
- FlowPath flowPath = new FlowPath();
- flowPath.setDataPath(dataPath);
-
- return flowPath;
- }
- Vertex v_src = src_port.getSwitch().asVertex();
- Vertex v_dest = dest_port.getSwitch().asVertex();
-
- //
- // Implement the Shortest Path computation by using Breath First Search
- //
- Set<Vertex> visitedSet = new HashSet<Vertex>();
- Queue<Vertex> processingList = new LinkedList<Vertex>();
- Map<Vertex, Vertex> previousVertexMap = new HashMap<Vertex, Vertex>();
-
- processingList.add(v_src);
- visitedSet.add(v_src);
- Boolean path_found = false;
- while (! processingList.isEmpty()) {
- Vertex nextVertex = processingList.poll();
- if (v_dest.equals(nextVertex)) {
- path_found = true;
- break;
- }
- for (Vertex parentPort : nextVertex.getVertices(Direction.OUT, "on")) {
- for (Vertex childPort : parentPort.getVertices(Direction.OUT, "link")) {
- for (Vertex child : childPort.getVertices(Direction.IN, "on")) {
- // Ignore inactive switches
- String state = child.getProperty("state").toString();
- if (! state.equals(SwitchState.ACTIVE.toString()))
- continue;
-
- if (! visitedSet.contains(child)) {
- previousVertexMap.put(parentPort, nextVertex);
- previousVertexMap.put(childPort, parentPort);
- previousVertexMap.put(child, childPort);
- visitedSet.add(child);
- processingList.add(child);
- }
- }
- }
- }
- }
- if (! path_found) {
- return null; // No path found
- }
-
- List<Vertex> resultPath = new LinkedList<Vertex>();
- Vertex previousVertex = v_dest;
- resultPath.add(v_dest);
- while (! v_src.equals(previousVertex)) {
- Vertex currentVertex = previousVertexMap.get(previousVertex);
- resultPath.add(currentVertex);
- previousVertex = currentVertex;
- }
- Collections.reverse(resultPath);
-
- // Loop through the result and prepare the return result
- // as a list of Flow Entries.
- //
- long nodeId = 0;
- short portId = 0;
- Port inPort = new Port(src_port.getNumber());
- Port outPort = new Port();
- int idx = 0;
- for (Vertex v: resultPath) {
- String type = v.getProperty("type").toString();
- // System.out.println("type: " + type);
- if (type.equals("port")) {
- //String number = v.getProperty("number").toString();
- // System.out.println("number: " + number);
-
- Object obj = v.getProperty("number");
- // String class_str = obj.getClass().toString();
- if (obj instanceof Short) {
- portId = (Short)obj;
- } else if (obj instanceof Integer) {
- Integer int_nodeId = (Integer)obj;
- portId = int_nodeId.shortValue();
- // int int_nodeId = (Integer)obj;
- // portId = (short)int_nodeId.;
- }
- } else if (type.equals("switch")) {
- String dpid = v.getProperty("dpid").toString();
- nodeId = HexString.toLong(dpid);
-
- // System.out.println("dpid: " + dpid);
- }
- idx++;
- if (idx == 1) {
- continue;
- }
- int mod = idx % 3;
- if (mod == 0) {
- // Setup the incoming port
- inPort = new Port(portId);
- continue;
- }
- if (mod == 2) {
- // Setup the outgoing port, and add the Flow Entry
- outPort = new Port(portId);
-
- FlowEntry flowEntry = new FlowEntry();
- flowEntry.setDpid(new Dpid(nodeId));
- flowEntry.setInPort(inPort);
- flowEntry.setOutPort(outPort);
- flowEntry.setFlowEntryMatch(new FlowEntryMatch());
- flowEntry.flowEntryMatch().enableInPort(flowEntry.inPort());
-
- // Set the outgoing port output action
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- FlowEntryAction flowEntryAction = new FlowEntryAction();
- flowEntryAction.setActionOutput(flowEntry.outPort());
- flowEntryActions.addAction(flowEntryAction);
- dataPath.flowEntries().add(flowEntry);
- continue;
- }
- }
- if (idx > 0) {
- // Add the last Flow Entry
- FlowEntry flowEntry = new FlowEntry();
- flowEntry.setDpid(new Dpid(nodeId));
- flowEntry.setInPort(inPort);
- flowEntry.setOutPort(new Port(dest_port.getNumber()));
- flowEntry.setFlowEntryMatch(new FlowEntryMatch());
- flowEntry.flowEntryMatch().enableInPort(flowEntry.inPort());
-
- // Set the outgoing port output action
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- FlowEntryAction flowEntryAction = new FlowEntryAction();
- flowEntryAction.setActionOutput(flowEntry.outPort());
- flowEntryActions.addAction(flowEntryAction);
- dataPath.flowEntries().add(flowEntry);
- // TODO (BOC): why is this twice?
- dataPath.flowEntries().add(flowEntry);
- }
-
-
- if (dataPath.flowEntries().size() > 0) {
- FlowPath flowPath = new FlowPath();
- flowPath.setDataPath(dataPath);
-
- return flowPath;
- }
- return null;
-
- }
-
- @Override
- public Iterable<FlowEntry> getFlowEntries(FlowPath flow) {
- // TODO Auto-generated method stub
- return null;
- }
-
-
- @Override
- public boolean installRemoteFlowEntry(FlowPath flowPath,
- FlowEntry entry) {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean removeRemoteFlowEntry(FlowPath flowPath,
- FlowEntry entry) {
- return false;
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean installFlowEntry(IOFSwitch mySwitch,
- FlowPath flowPath,
- FlowEntry flowEntry) {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean removeFlowEntry(IOFSwitch mySwitch,
- FlowPath flowPath,
- FlowEntry flowEntry) {
- // TODO Auto-generated method stub
- return false;
- }
-
-
-}
diff --git a/src/main/java/net/onrc/onos/flow/IFlowManager.java b/src/main/java/net/onrc/onos/flow/IFlowManager.java
deleted file mode 100644
index 598da85..0000000
--- a/src/main/java/net/onrc/onos/flow/IFlowManager.java
+++ /dev/null
@@ -1,126 +0,0 @@
-package net.onrc.onos.flow;
-
-import net.floodlightcontroller.core.IOFSwitch;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
-import net.onrc.onos.ofcontroller.util.FlowEntry;
-import net.onrc.onos.ofcontroller.util.FlowPath;
-
-public interface IFlowManager {
- /**
- * Create a Flow from port to port.
- *
- * TODO: We don't need it for now.
- *
- * @param src_port the source port.
- * @param dest_port the destination port.
- */
- public void createFlow(IPortObject src_port, IPortObject dest_port);
-
- /**
- * Get all Flows matching a source and a destination port.
- *
- * TODO: Pankaj might be implementing it later.
- *
- * @param src_port the source port to match.
- * @param dest_port the destination port to match.
- * @return all flows matching the source and the destination port.
- */
- public Iterable<FlowPath> getFlows(IPortObject src_port,
- IPortObject dest_port);
-
- /**
- * Get all Flows going out from a port.
- *
- * TODO: We need it now: Pankaj
- *
- * @param port the port to match.
- * @return the list of flows that are going out from the port.
- */
- public Iterable<FlowPath> getOutFlows(IPortObject port);
-
- /**
- * Reconcile all flows on inactive switch port.
- *
- * @param portObject the port that has become inactive.
- */
- public void reconcileFlows(IPortObject portObject);
-
- /**
- * Reconcile all flows between a source and a destination port.
- *
- * TODO: We don't need it for now.
- *
- * @param src_port the source port.
- * @param dest_port the destination port.
- */
- public void reconcileFlow(IPortObject src_port, IPortObject dest_port);
-
- /**
- * Compute the shortest path between a source and a destination ports.
- *
- * @param src_port the source port.
- * @param dest_port the destination port.
- * @return the computed shortest path between the source and the
- * destination ports. The flow entries in the path itself would
- * contain the incoming port matching and the outgoing port output
- * actions set. However, the path itself will NOT have the Flow ID,
- * Installer ID, and any additional matching conditions for the
- * flow entries (e.g., source or destination MAC address, etc).
- */
- public FlowPath computeFlowPath(IPortObject src_port,
- IPortObject dest_port);
-
- /**
- * Get all Flow Entries of a Flow.
- *
- * @param flow the flow whose flow entries should be returned.
- * @return the flow entries of the flow.
- */
- public Iterable<FlowEntry> getFlowEntries(FlowPath flow);
-
- /**
- * Install a Flow Entry on a switch.
- *
- * @param mySwitch the switch to install the Flow Entry into.
- * @param flowPath the flow path for the flow entry to install.
- * @param flowEntry the flow entry to install.
- * @return true on success, otherwise false.
- */
- public boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
- FlowEntry flowEntry);
-
- /**
- * Remove a Flow Entry from a switch.
- *
- * @param mySwitch the switch to remove the Flow Entry from.
- * @param flowPath the flow path for the flow entry to remove.
- * @param flowEntry the flow entry to remove.
- * @return true on success, otherwise false.
- */
- public boolean removeFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
- FlowEntry flowEntry);
-
- /**
- * Install a Flow Entry on a remote controller.
- *
- * TODO: We need it now: Jono
- * - For now it will make a REST call to the remote controller.
- * - Internally, it needs to know the name of the remote controller.
- *
- * @param flowPath the flow path for the flow entry to install.
- * @param flowEntry the flow entry to install.
- * @return true on success, otherwise false.
- */
- public boolean installRemoteFlowEntry(FlowPath flowPath,
- FlowEntry flowEntry);
-
- /**
- * Remove a flow entry on a remote controller.
- *
- * @param flowPath the flow path for the flow entry to remove.
- * @param flowEntry the flow entry to remove.
- * @return true on success, otherwise false.
- */
- public boolean removeRemoteFlowEntry(FlowPath flowPath,
- FlowEntry flowEntry);
-}
diff --git a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
index 5388233..40f5044 100644
--- a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
+++ b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
@@ -1,7 +1,5 @@
package net.onrc.onos.graph;
-import net.onrc.onos.flow.FlowManagerImpl;
-import net.onrc.onos.flow.IFlowManager;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
import org.slf4j.Logger;
@@ -52,9 +50,12 @@
src_port.getNumber(),
dest_port.getSwitch().getDPID(),
dest_port.getNumber()});
- IFlowManager manager = new FlowManagerImpl();
// TODO: Find the flows and add to reconcile queue
- manager.reconcileFlows(src_port);
+ //
+ // NOTE: Old code/logic.
+ //
+ // IFlowService flowManager = ...
+ // flowManager.reconcileFlows(src_port);
}
}
@@ -81,8 +82,11 @@
IPortObject src_port = conn.getFramedGraph().frame(vertex, IPortObject.class);
log.debug("TopologyEvents: Port removed: {}:{}",src_port.getSwitch().getDPID(),src_port.getNumber());
- IFlowManager manager = new FlowManagerImpl();
- manager.reconcileFlows(src_port);
+
+ // NOTE: Old code/logic.
+ //
+ // IFlowService flowManager = ...
+ // flowManager.reconcileFlows(src_port);
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index f623541..71479a1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -25,7 +25,7 @@
/**
* Class for performing Flow-related operations on the Database.
*/
-class FlowDatabaseOperation {
+public class FlowDatabaseOperation {
private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
/**
@@ -68,6 +68,21 @@
}
//
+ // Remove the old Flow Entries
+ //
+ if (found) {
+ Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
+ LinkedList<IFlowEntry> deleteFlowEntries =
+ new LinkedList<IFlowEntry>();
+ for (IFlowEntry flowEntryObj : flowEntries)
+ deleteFlowEntries.add(flowEntryObj);
+ for (IFlowEntry flowEntryObj : deleteFlowEntries) {
+ flowObj.removeFlowEntry(flowEntryObj);
+ dbHandler.removeFlowEntry(flowEntryObj);
+ }
+ }
+
+ //
// Set the Flow key:
// - flowId
//
@@ -155,6 +170,9 @@
// flowPath.dataPath().flowEntries()
//
for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
+ if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE)
+ continue; // Skip: all Flow Entries were deleted earlier
+
if (addFlowEntry(flowManager, dbHandler, flowObj, flowEntry) == null) {
dbHandler.rollback();
return false;
@@ -186,15 +204,6 @@
// Flow edges
// HeadFE (TODO)
- //
- // Assign the FlowEntry ID.
- //
- if ((flowEntry.flowEntryId() == null) ||
- (flowEntry.flowEntryId().value() == 0)) {
- long id = flowManager.getNextFlowEntryId();
- flowEntry.setFlowEntryId(new FlowEntryId(id));
- }
-
IFlowEntry flowEntryObj = null;
boolean found = false;
try {
@@ -370,122 +379,6 @@
* @return true on success, otherwise false.
*/
static boolean deleteAllFlows(GraphDBOperation dbHandler) {
- final ConcurrentLinkedQueue<FlowId> concurrentAllFlowIds =
- new ConcurrentLinkedQueue<FlowId>();
-
- // Get all Flow IDs
- Iterable<IFlowPath> allFlowPaths = dbHandler.getAllFlowPaths();
- for (IFlowPath flowPathObj : allFlowPaths) {
- if (flowPathObj == null)
- continue;
- String flowIdStr = flowPathObj.getFlowId();
- if (flowIdStr == null)
- continue;
- FlowId flowId = new FlowId(flowIdStr);
- concurrentAllFlowIds.add(flowId);
- }
-
- // Delete all flows one-by-one
- for (FlowId flowId : concurrentAllFlowIds)
- deleteFlow(dbHandler, flowId);
-
- /*
- * TODO: A faster mechanism to delete the Flow Paths by using
- * a number of threads. Commented-out for now.
- */
- /*
- //
- // Create the threads to delete the Flow Paths
- //
- List<Thread> threads = new LinkedList<Thread>();
- for (int i = 0; i < 10; i++) {
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- FlowId flowId = concurrentAllFlowIds.poll();
- if (flowId == null)
- return;
- deleteFlow(dbHandler, flowId);
- }
- }}, "Delete All Flow Paths");
- threads.add(thread);
- }
-
- // Start processing
- for (Thread thread : threads) {
- thread.start();
- }
-
- // Wait for all threads to complete
- for (Thread thread : threads) {
- try {
- thread.join();
- } catch (InterruptedException e) {
- log.debug("Exception waiting for a thread to delete a Flow Path: ", e);
- }
- }
- */
-
- return true;
- }
-
- /**
- * Delete a previously added flow.
- *
- * @param dbHandler the Graph Database handler to use.
- * @param flowId the Flow ID of the flow to delete.
- * @return true on success, otherwise false.
- */
- static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
- IFlowPath flowObj = null;
- //
- // We just mark the entries for deletion,
- // and let the switches remove each individual entry after
- // it has been removed from the switches.
- //
- try {
- flowObj = dbHandler.searchFlowPath(flowId);
- } catch (Exception e) {
- // TODO: handle exceptions
- dbHandler.rollback();
- log.error(":deleteFlow FlowId:{} failed", flowId.toString());
- return false;
- }
- if (flowObj == null) {
- dbHandler.commit();
- return true; // OK: No such flow
- }
-
- //
- // Find and mark for deletion all Flow Entries,
- // and the Flow itself.
- //
- flowObj.setFlowPathUserState("FP_USER_DELETE");
- Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
- boolean empty = true; // TODO: an ugly hack
- for (IFlowEntry flowEntryObj : flowEntries) {
- empty = false;
- // flowObj.removeFlowEntry(flowEntryObj);
- // conn.utils().removeFlowEntry(conn, flowEntryObj);
- flowEntryObj.setUserState("FE_USER_DELETE");
- flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- }
- // Remove from the database empty flows
- if (empty)
- dbHandler.removeFlowPath(flowObj);
- dbHandler.commit();
-
- return true;
- }
-
- /**
- * Clear the state for all previously added flows.
- *
- * @param dbHandler the Graph Database handler to use.
- * @return true on success, otherwise false.
- */
- static boolean clearAllFlows(GraphDBOperation dbHandler) {
List<FlowId> allFlowIds = new LinkedList<FlowId>();
// Get all Flow IDs
@@ -500,29 +393,29 @@
allFlowIds.add(flowId);
}
- // Clear all flows one-by-one
+ // Delete all flows one-by-one
for (FlowId flowId : allFlowIds) {
- clearFlow(dbHandler, flowId);
+ deleteFlow(dbHandler, flowId);
}
return true;
}
/**
- * Clear the state for a previously added flow.
+ * Delete a previously added flow.
*
* @param dbHandler the Graph Database handler to use.
- * @param flowId the Flow ID of the flow to clear.
+ * @param flowId the Flow ID of the flow to delete.
* @return true on success, otherwise false.
*/
- static boolean clearFlow(GraphDBOperation dbHandler, FlowId flowId) {
+ static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
IFlowPath flowObj = null;
try {
flowObj = dbHandler.searchFlowPath(flowId);
} catch (Exception e) {
// TODO: handle exceptions
dbHandler.rollback();
- log.error(":clearFlow FlowId:{} failed", flowId.toString());
+ log.error(":deleteFlow FlowId:{} failed", flowId.toString());
return false;
}
if (flowObj == null) {
@@ -706,9 +599,9 @@
* @param maxFlows the maximum number of flows to be returned.
* @return the Flow Paths if found, otherwise null.
*/
- static ArrayList<IFlowPath> getAllFlowsSummary(GraphDBOperation dbHandler,
- FlowId flowId,
- int maxFlows) {
+ static ArrayList<FlowPath> getAllFlowsSummary(GraphDBOperation dbHandler,
+ FlowId flowId,
+ int maxFlows) {
//
// TODO: The implementation below is not optimal:
// We fetch all flows, and then return only the subset that match
@@ -716,61 +609,32 @@
// We should use the appropriate Titan/Gremlin query to filter-out
// the flows as appropriate.
//
- ArrayList<IFlowPath> flowPathsWithoutFlowEntries =
- getAllFlowsWithoutFlowEntries(dbHandler);
-
- Collections.sort(flowPathsWithoutFlowEntries,
- new Comparator<IFlowPath>() {
- @Override
- public int compare(IFlowPath first, IFlowPath second) {
- long result =
- new FlowId(first.getFlowId()).value()
- - new FlowId(second.getFlowId()).value();
- if (result > 0) {
- return 1;
- } else if (result < 0) {
- return -1;
- } else {
- return 0;
- }
- }
- }
- );
-
- return flowPathsWithoutFlowEntries;
+ ArrayList<FlowPath> flowPaths = getAllFlowsWithDataPathSummary(dbHandler);
+ Collections.sort(flowPaths);
+ return flowPaths;
}
/**
- * Get all Flows information, without the associated Flow Entries.
+ * Get all Flows information, with Data Path summary for the Flow Entries.
*
* @param dbHandler the Graph Database handler to use.
- * @return all Flows information, without the associated Flow Entries.
+ * @return all Flows information, with Data Path summary for the Flow
+ * Entries.
*/
- static ArrayList<IFlowPath> getAllFlowsWithoutFlowEntries(GraphDBOperation dbHandler) {
- Iterable<IFlowPath> flowPathsObj = null;
- ArrayList<IFlowPath> flowPathsObjArray = new ArrayList<IFlowPath>();
+ static ArrayList<FlowPath> getAllFlowsWithDataPathSummary(GraphDBOperation dbHandler) {
+ ArrayList<FlowPath> flowPaths = getAllFlows(dbHandler);
- // TODO: Remove this op.commit() flow, because it is not needed?
- dbHandler.commit();
+ // Truncate each Flow Path and Flow Entry
+ for (FlowPath flowPath : flowPaths) {
+ flowPath.setFlowEntryMatch(null);
+ flowPath.setFlowEntryActions(null);
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntryMatch(null);
+ flowEntry.setFlowEntryActions(null);
+ }
+ }
- try {
- flowPathsObj = dbHandler.getAllFlowPaths();
- } catch (Exception e) {
- // TODO: handle exceptions
- dbHandler.rollback();
- log.error(":getAllFlowPaths failed");
- return flowPathsObjArray; // No Flows found
- }
- if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
- return flowPathsObjArray; // No Flows found
- }
-
- for (IFlowPath flowObj : flowPathsObj)
- flowPathsObjArray.add(flowObj);
-
- // conn.endTx(Transaction.COMMIT);
-
- return flowPathsObjArray;
+ return flowPaths;
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 0e9887a..feb80e1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -58,9 +58,6 @@
private FlowManager flowManager; // The Flow Manager to use
private IDatagridService datagridService; // The Datagrid Service to use
private Topology topology; // The network topology
- private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
- private Map<Long, FlowEntry> unmatchedFlowEntryAdd =
- new HashMap<Long, FlowEntry>();
// The queue with Flow Path and Topology Element updates
private BlockingQueue<EventEntry<?>> networkEvents =
@@ -74,21 +71,22 @@
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ // All internally computed Flow Paths
+ private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
+
+ // The Flow Entries received as notifications with unmatched Flow Paths
+ private Map<Long, FlowEntry> unmatchedFlowEntryAdd =
+ new HashMap<Long, FlowEntry>();
+
//
// Transient state for processing the Flow Paths:
- // - The new Flow Paths
// - The Flow Paths that should be recomputed
// - The Flow Paths with modified Flow Entries
- // - The Flow Entries that were updated
//
- private List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
- private List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
- private List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
- private List<FlowPathEntryPair> updatedFlowEntries =
- new LinkedList<FlowPathEntryPair>();
- private List<FlowPathEntryPair> unmatchedDeleteFlowEntries =
- new LinkedList<FlowPathEntryPair>();
-
+ private Map<Long, FlowPath> shouldRecomputeFlowPaths =
+ new HashMap<Long, FlowPath>();
+ private Map<Long, FlowPath> modifiedFlowPaths =
+ new HashMap<Long, FlowPath>();
/**
* Constructor for a given Flow Manager and Datagrid Service.
@@ -171,18 +169,28 @@
// - EventEntry<FlowEntry>
//
for (EventEntry<?> event : collection) {
+ // Topology event
if (event.eventData() instanceof TopologyElement) {
EventEntry<TopologyElement> topologyEventEntry =
(EventEntry<TopologyElement>)event;
topologyEvents.add(topologyEventEntry);
- } else if (event.eventData() instanceof FlowPath) {
+ continue;
+ }
+
+ // FlowPath event
+ if (event.eventData() instanceof FlowPath) {
EventEntry<FlowPath> flowPathEventEntry =
(EventEntry<FlowPath>)event;
flowPathEvents.add(flowPathEventEntry);
- } else if (event.eventData() instanceof FlowEntry) {
+ continue;
+ }
+
+ // FlowEntry event
+ if (event.eventData() instanceof FlowEntry) {
EventEntry<FlowEntry> flowEntryEventEntry =
(EventEntry<FlowEntry>)event;
flowEntryEvents.add(flowEntryEventEntry);
+ continue;
}
}
collection.clear();
@@ -208,22 +216,15 @@
processFlowPathEvents();
processTopologyEvents();
- //
- // Add all new Flows: should be done after processing the Flow Path
- // and Topology events.
- //
- for (FlowPath flowPath : newFlowPaths) {
- allFlowPaths.put(flowPath.flowId().value(), flowPath);
- }
-
processFlowEntryEvents();
// Recompute all affected Flow Paths and keep only the modified
- for (FlowPath flowPath : recomputeFlowPaths) {
+ for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
if (recomputeFlowPath(flowPath))
- modifiedFlowPaths.add(flowPath);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
}
+ // Extract the modified Flow Entries
modifiedFlowEntries = extractModifiedFlowEntries(modifiedFlowPaths);
// Assign missing Flow Entry IDs
@@ -234,14 +235,12 @@
//
flowManager.pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
flowManager.pushModifiedFlowEntriesToDatagrid(modifiedFlowEntries);
- flowManager.pushModifiedFlowEntriesToDatabase(modifiedFlowEntries);
- flowManager.pushModifiedFlowEntriesToDatabase(updatedFlowEntries);
- flowManager.pushModifiedFlowEntriesToDatabase(unmatchedDeleteFlowEntries);
+ flowManager.pushModifiedFlowPathsToDatabase(modifiedFlowPaths.values());
//
// Remove Flow Entries that were deleted
//
- for (FlowPath flowPath : modifiedFlowPaths)
+ for (FlowPath flowPath : modifiedFlowPaths.values())
flowPath.dataPath().removeDeletedFlowEntries();
// Cleanup
@@ -249,23 +248,20 @@
flowPathEvents.clear();
flowEntryEvents.clear();
//
- newFlowPaths.clear();
- recomputeFlowPaths.clear();
+ shouldRecomputeFlowPaths.clear();
modifiedFlowPaths.clear();
- updatedFlowEntries.clear();
- unmatchedDeleteFlowEntries.clear();
}
/**
* Extract the modified Flow Entries.
*/
private List<FlowPathEntryPair> extractModifiedFlowEntries(
- List<FlowPath> modifiedFlowPaths) {
+ Map<Long, FlowPath> modifiedFlowPaths) {
List<FlowPathEntryPair> modifiedFlowEntries =
new LinkedList<FlowPathEntryPair>();
// Extract only the modified Flow Entries
- for (FlowPath flowPath : modifiedFlowPaths) {
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
for (FlowEntry flowEntry : flowPath.flowEntries()) {
if (flowEntry.flowEntrySwitchState() ==
FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
@@ -324,10 +320,8 @@
if (allFlowPaths.get(flowPath.flowId().value()) != null) {
//
// TODO: What to do if the Flow Path already exists?
- // Remove and then re-add it, or merge the info?
- // For now, we don't have to do anything.
+ // Fow now, we just re-add it.
//
- break;
}
switch (flowPath.flowPathType()) {
@@ -337,7 +331,8 @@
// we are going to recompute it anyway.
//
flowPath.flowEntries().clear();
- recomputeFlowPaths.add(flowPath);
+ shouldRecomputeFlowPaths.put(flowPath.flowId().value(),
+ flowPath);
break;
case FP_TYPE_EXPLICIT_PATH:
//
@@ -346,10 +341,10 @@
for (FlowEntry flowEntry : flowPath.flowEntries()) {
flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
}
- modifiedFlowPaths.add(flowPath);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
break;
}
- newFlowPaths.add(flowPath);
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
break;
}
@@ -372,8 +367,11 @@
flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
}
- allFlowPaths.remove(existingFlowPath.flowId().value());
- modifiedFlowPaths.add(existingFlowPath);
+ // Remove the Flow Path from the internal state
+ Long key = existingFlowPath.flowId().value();
+ allFlowPaths.remove(key);
+ shouldRecomputeFlowPaths.remove(key);
+ modifiedFlowPaths.put(key, existingFlowPath);
break;
}
@@ -406,7 +404,7 @@
}
if (isTopologyModified) {
// TODO: For now, if the topology changes, we recompute all Flows
- recomputeFlowPaths.addAll(allFlowPaths.values());
+ shouldRecomputeFlowPaths.putAll(allFlowPaths);
}
}
@@ -434,7 +432,7 @@
continue;
}
flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
- updatedFlowEntries.add(flowPair);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
}
unmatchedFlowEntryAdd = remainingUpdates;
}
@@ -471,15 +469,15 @@
}
// Add the updated entry to the list of updated Flow Entries
flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
- updatedFlowEntries.add(flowPair);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
break;
case ENTRY_REMOVE:
flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
if (unmatchedFlowEntryAdd.remove(flowEntry.flowEntryId().value()) != null) {
- continue; // Match found
+ continue; // Removed previously unmatched entry
}
-
+
flowPath = allFlowPaths.get(flowEntry.flowId().value());
if (flowPath == null) {
// Flow Path not found: ignore the update
@@ -487,13 +485,11 @@
}
updatedFlowEntry = updateFlowEntryRemove(flowPath, flowEntry);
if (updatedFlowEntry == null) {
- // Flow Entry not found: add to list of deleted entries
- flowPair = new FlowPathEntryPair(flowPath, flowEntry);
- unmatchedDeleteFlowEntries.add(flowPair);
+ // Flow Entry not found: ignore it
break;
}
flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
- updatedFlowEntries.add(flowPair);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
break;
}
}
@@ -710,9 +706,18 @@
//
newFlowEntry.setFlowId(new FlowId(flowPath.flowId().value()));
- // Set the incoming port matching
- FlowEntryMatch flowEntryMatch = new FlowEntryMatch();
+ //
+ // Allocate the FlowEntryMatch by copying the default one
+ // from the FlowPath (if set).
+ //
+ FlowEntryMatch flowEntryMatch = null;
+ if (flowPath.flowEntryMatch() != null)
+ flowEntryMatch = new FlowEntryMatch(flowPath.flowEntryMatch());
+ else
+ flowEntryMatch = new FlowEntryMatch();
newFlowEntry.setFlowEntryMatch(flowEntryMatch);
+
+ // Set the incoming port matching
flowEntryMatch.enableInPort(newFlowEntry.inPort());
//
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 174d5d7..7f43fe9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -2,12 +2,12 @@
import java.util.ArrayList;
import java.util.Collection;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.Executors;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -18,7 +18,6 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
-import net.floodlightcontroller.util.OFMessageDamper;
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.graph.GraphDBOperation;
import net.onrc.onos.ofcontroller.core.INetMapStorage;
@@ -27,7 +26,6 @@
import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
import net.onrc.onos.ofcontroller.flowmanager.web.FlowWebRoutable;
import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
-import net.onrc.onos.ofcontroller.topology.ITopologyNetService;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.util.*;
@@ -39,21 +37,10 @@
* Flow Manager class for handling the network flows.
*/
public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
-
- //
- // TODO: A temporary variable to switch between the poll-based and
- // notification mechanism for the Flow Manager.
- //
- private final static boolean enableNotifications = true;
-
- // flag to use FlowPusher instead of FlowSwitchOperation/MessageDamper
- private final static boolean enableFlowPusher = false;
-
protected GraphDBOperation dbHandlerApi;
protected GraphDBOperation dbHandlerInner;
protected volatile IFloodlightProviderService floodlightProvider;
- protected volatile ITopologyNetService topologyNetService;
protected volatile IDatagridService datagridService;
protected IRestApiService restApi;
protected FloodlightModuleContext context;
@@ -61,322 +48,18 @@
protected IFlowPusherService pusher;
- protected OFMessageDamper messageDamper;
-
- //
- // TODO: Values copied from elsewhere (class LearningSwitch).
- // The local copy should go away!
- //
- protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
- protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
-
// Flow Entry ID generation state
private static Random randomGenerator = new Random();
private static int nextFlowEntryIdPrefix = 0;
private static int nextFlowEntryIdSuffix = 0;
- private static long nextFlowEntryId = 0;
/** The logger. */
private final static Logger log = LoggerFactory.getLogger(FlowManager.class);
- // The periodic task(s)
- private ScheduledExecutorService mapReaderScheduler;
- private ScheduledExecutorService shortestPathReconcileScheduler;
-
- /**
- * Periodic task for reading the Flow Entries and pushing changes
- * into the switches.
- */
- final Runnable mapReader = new Runnable() {
- public void run() {
- try {
- runImpl();
- } catch (Exception e) {
- log.debug("Exception processing All Flow Entries from the Network MAP: ", e);
- dbHandlerInner.rollback();
- return;
- }
- }
-
- private void runImpl() {
- long startTime = System.nanoTime();
- int counterAllFlowEntries = 0;
- int counterMyNotUpdatedFlowEntries = 0;
-
- if (floodlightProvider == null) {
- log.debug("FloodlightProvider service not found!");
- return;
- }
- Map<Long, IOFSwitch> mySwitches =
- floodlightProvider.getSwitches();
- if (mySwitches.isEmpty()) {
- log.trace("No switches controlled");
- return;
- }
- LinkedList<IFlowEntry> addFlowEntries =
- new LinkedList<IFlowEntry>();
- LinkedList<IFlowEntry> deleteFlowEntries =
- new LinkedList<IFlowEntry>();
-
- //
- // Fetch all Flow Entries which need to be updated and select
- // only my Flow Entries that need to be updated into the
- // switches.
- //
- Iterable<IFlowEntry> allFlowEntries =
- dbHandlerInner.getAllSwitchNotUpdatedFlowEntries();
- for (IFlowEntry flowEntryObj : allFlowEntries) {
- log.debug("flowEntryobj : {}", flowEntryObj);
-
- counterAllFlowEntries++;
-
- String dpidStr = flowEntryObj.getSwitchDpid();
- if (dpidStr == null)
- continue;
- Dpid dpid = new Dpid(dpidStr);
- IOFSwitch mySwitch = mySwitches.get(dpid.value());
- if (mySwitch == null)
- continue; // Ignore the entry: not my switch
-
- IFlowPath flowObj =
- dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null)
- continue; // Should NOT happen
- if (flowObj.getFlowId() == null)
- continue; // Invalid entry
-
- //
- // NOTE: For now we process the DELETE before the ADD
- // to cover the more common scenario.
- // TODO: This is error prone and needs to be fixed!
- //
- String userState = flowEntryObj.getUserState();
- if (userState == null)
- continue;
- if (userState.equals("FE_USER_DELETE")) {
- // An entry that needs to be deleted.
- deleteFlowEntries.add(flowEntryObj);
- installFlowEntry(mySwitch, flowObj, flowEntryObj);
- } else {
- addFlowEntries.add(flowEntryObj);
- }
- counterMyNotUpdatedFlowEntries++;
- }
-
- log.debug("addFlowEntries : {}", addFlowEntries);
-
- //
- // Process the Flow Entries that need to be added
- //
- for (IFlowEntry flowEntryObj : addFlowEntries) {
- IFlowPath flowObj =
- dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null)
- continue; // Should NOT happen
- if (flowObj.getFlowId() == null)
- continue; // Invalid entry
-
- Dpid dpid = new Dpid(flowEntryObj.getSwitchDpid());
- IOFSwitch mySwitch = mySwitches.get(dpid.value());
- if (mySwitch == null)
- continue; // Shouldn't happen
- installFlowEntry(mySwitch, flowObj, flowEntryObj);
- }
-
- //
- // Delete all Flow Entries marked for deletion from the
- // Network MAP.
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and delete the Flow Entries after the
- // Barrier message is received.
- //
- while (! deleteFlowEntries.isEmpty()) {
- IFlowEntry flowEntryObj = deleteFlowEntries.poll();
- IFlowPath flowObj =
- dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null) {
- log.debug("Did not find FlowPath to be deleted");
- continue;
- }
- flowObj.removeFlowEntry(flowEntryObj);
- dbHandlerInner.removeFlowEntry(flowEntryObj);
- }
-
- dbHandlerInner.commit();
-
- long estimatedTime = System.nanoTime() - startTime;
- double rate = 0.0;
- if (estimatedTime > 0)
- rate = ((double)counterAllFlowEntries * 1000000000) / estimatedTime;
- String logMsg = "MEASUREMENT: Processed AllFlowEntries: " +
- counterAllFlowEntries + " MyNotUpdatedFlowEntries: " +
- counterMyNotUpdatedFlowEntries + " in " +
- (double)estimatedTime / 1000000000 + " sec: " +
- rate + " paths/s";
- log.debug(logMsg);
- }
- };
-
- /**
- * Periodic task for reading the Flow Paths and recomputing the
- * shortest paths.
- */
- final Runnable shortestPathReconcile = new Runnable() {
- public void run() {
- try {
- runImpl();
- } catch (Exception e) {
- log.debug("Exception processing All Flows from the Network MAP: ", e);
- dbHandlerInner.rollback();
- return;
- }
- }
-
- private void runImpl() {
- long startTime = System.nanoTime();
- int counterAllFlowPaths = 0;
- int counterMyFlowPaths = 0;
-
- if (floodlightProvider == null) {
- log.debug("FloodlightProvider service not found!");
- return;
- }
- Map<Long, IOFSwitch> mySwitches =
- floodlightProvider.getSwitches();
- if (mySwitches.isEmpty()) {
- log.trace("No switches controlled");
- return;
- }
- LinkedList<IFlowPath> deleteFlows = new LinkedList<IFlowPath>();
-
- //
- // Fetch and recompute the Shortest Path for those
- // Flow Paths this controller is responsible for.
- //
- Topology topology = topologyNetService.newDatabaseTopology();
- Iterable<IFlowPath> allFlowPaths = dbHandlerInner.getAllFlowPaths();
- for (IFlowPath flowPathObj : allFlowPaths) {
- counterAllFlowPaths++;
- if (flowPathObj == null)
- continue;
-
- String srcDpidStr = flowPathObj.getSrcSwitch();
- if (srcDpidStr == null)
- continue;
- Dpid srcDpid = new Dpid(srcDpidStr);
- //
- // Use the source DPID as a heuristic to decide
- // which controller is responsible for maintaining the
- // shortest path.
- // NOTE: This heuristic is error-prone: if the switch
- // goes away and no controller is responsible for that
- // switch, then the original Flow Path is not cleaned-up
- //
- IOFSwitch mySwitch = mySwitches.get(srcDpid.value());
- if (mySwitch == null)
- continue; // Ignore: not my responsibility
-
- // Test whether we need to maintain this flow
- String flowPathTypeStr = flowPathObj.getFlowPathType();
- if (flowPathTypeStr == null)
- continue; // Could be invalid entry?
- if (! flowPathTypeStr.equals("FP_TYPE_SHORTEST_PATH"))
- continue; // No need to maintain this flow
-
- //
- // Test whether we need to complete the Flow cleanup,
- // if the Flow has been deleted by the user.
- //
- String flowPathUserStateStr = flowPathObj.getFlowPathUserState();
- if ((flowPathUserStateStr != null)
- && flowPathUserStateStr.equals("FP_USER_DELETE")) {
- Iterable<IFlowEntry> flowEntries = flowPathObj.getFlowEntries();
- final boolean empty = !flowEntries.iterator().hasNext();
- if (empty)
- deleteFlows.add(flowPathObj);
- }
-
- // Fetch the fields needed to recompute the shortest path
- String dataPathSummaryStr = flowPathObj.getDataPathSummary();
- Short srcPortShort = flowPathObj.getSrcPort();
- String dstDpidStr = flowPathObj.getDstSwitch();
- Short dstPortShort = flowPathObj.getDstPort();
- Long flowPathFlagsLong = flowPathObj.getFlowPathFlags();
- if ((dataPathSummaryStr == null) ||
- (srcPortShort == null) ||
- (dstDpidStr == null) ||
- (dstPortShort == null) ||
- (flowPathFlagsLong == null)) {
- continue;
- }
-
- Port srcPort = new Port(srcPortShort);
- Dpid dstDpid = new Dpid(dstDpidStr);
- Port dstPort = new Port(dstPortShort);
- SwitchPort srcSwitchPort = new SwitchPort(srcDpid, srcPort);
- SwitchPort dstSwitchPort = new SwitchPort(dstDpid, dstPort);
- FlowPathType flowPathType = FlowPathType.valueOf(flowPathTypeStr);
- FlowPathUserState flowPathUserState = FlowPathUserState.valueOf(flowPathUserStateStr);
- FlowPathFlags flowPathFlags = new FlowPathFlags(flowPathFlagsLong);
-
- counterMyFlowPaths++;
-
- //
- // NOTE: Using here the regular getDatabaseShortestPath()
- // method won't work here, because that method calls
- // internally "conn.endTx(Transaction.COMMIT)", and that
- // will invalidate all handlers to the Titan database.
- // If we want to experiment with calling here
- // getDatabaseShortestPath(), we need to refactor that code
- // to avoid closing the transaction.
- //
- DataPath dataPath =
- topologyNetService.getTopologyShortestPath(
- topology,
- srcSwitchPort,
- dstSwitchPort);
- if (dataPath == null) {
- // We need the DataPath to compare the paths
- dataPath = new DataPath();
- dataPath.setSrcPort(srcSwitchPort);
- dataPath.setDstPort(dstSwitchPort);
- }
- dataPath.applyFlowPathFlags(flowPathFlags);
-
- String newDataPathSummaryStr = dataPath.dataPathSummary();
- if (dataPathSummaryStr.equals(newDataPathSummaryStr))
- continue; // Nothing changed
-
- reconcileFlow(flowPathObj, dataPath);
- }
-
- //
- // Delete all leftover Flows marked for deletion from the
- // Network MAP.
- //
- while (! deleteFlows.isEmpty()) {
- IFlowPath flowPathObj = deleteFlows.poll();
- dbHandlerInner.removeFlowPath(flowPathObj);
- }
-
- topologyNetService.dropTopology(topology);
-
- dbHandlerInner.commit();
-
- long estimatedTime = System.nanoTime() - startTime;
- double rate = 0.0;
- if (estimatedTime > 0)
- rate = ((double)counterAllFlowPaths * 1000000000) / estimatedTime;
- String logMsg = "MEASUREMENT: Processed AllFlowPaths: " +
- counterAllFlowPaths + " MyFlowPaths: " +
- counterMyFlowPaths + " in " +
- (double)estimatedTime / 1000000000 + " sec: " +
- rate + " paths/s";
- log.debug(logMsg);
- }
- };
-
+ // The queue to write Flow Entries to the database
+ private BlockingQueue<FlowPath> flowPathsToDatabaseQueue =
+ new LinkedBlockingQueue<FlowPath>();
+ FlowDatabaseWriter flowDatabaseWriter;
/**
* Initialize the Flow Manager.
@@ -462,22 +145,11 @@
throws FloodlightModuleException {
this.context = context;
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
- topologyNetService = context.getServiceImpl(ITopologyNetService.class);
datagridService = context.getServiceImpl(IDatagridService.class);
restApi = context.getServiceImpl(IRestApiService.class);
-
- if (enableFlowPusher) {
- pusher = context.getServiceImpl(IFlowPusherService.class);
- } else {
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
- EnumSet.of(OFType.FLOW_MOD),
- OFMESSAGE_DAMPER_TIMEOUT);
- }
+ pusher = context.getServiceImpl(IFlowPusherService.class);
this.init("");
-
- mapReaderScheduler = Executors.newScheduledThreadPool(1);
- shortestPathReconcileScheduler = Executors.newScheduledThreadPool(1);
}
/**
@@ -515,22 +187,23 @@
// Initialize the Flow Entry ID generator
nextFlowEntryIdPrefix = randomGenerator.nextInt();
-
+
//
- // Create the Flow Event Handler thread and register it with the
- // Datagrid Service
+ // The thread to write to the database
+ //
+ flowDatabaseWriter = new FlowDatabaseWriter(this,
+ flowPathsToDatabaseQueue);
+ flowDatabaseWriter.start();
+
+ //
+ // The Flow Event Handler thread:
+ // - create
+ // - register with the Datagrid Service
+ // - startup
//
flowEventHandler = new FlowEventHandler(this, datagridService);
datagridService.registerFlowEventHandlerService(flowEventHandler);
-
- // Schedule the threads and periodic tasks
flowEventHandler.start();
- if (! enableNotifications) {
- mapReaderScheduler.scheduleAtFixedRate(
- mapReader, 3, 3, TimeUnit.SECONDS);
- shortestPathReconcileScheduler.scheduleAtFixedRate(
- shortestPathReconcile, 3, 3, TimeUnit.SECONDS);
- }
}
/**
@@ -593,19 +266,11 @@
*/
@Override
public boolean deleteAllFlows() {
- //
- // TODO: In the notification-based implementation,
- // deleteFlow() is implemented by using clearFlow()
- //
- return clearAllFlows();
-
- /*
if (FlowDatabaseOperation.deleteAllFlows(dbHandlerApi)) {
datagridService.notificationSendAllFlowsRemoved();
return true;
}
return false;
- */
}
/**
@@ -616,47 +281,11 @@
*/
@Override
public boolean deleteFlow(FlowId flowId) {
- //
- // TODO: In the notification-based implementation,
- // deleteFlow() is implemented by using clearFlow()
- //
- return clearFlow(flowId);
- /*
if (FlowDatabaseOperation.deleteFlow(dbHandlerApi, flowId)) {
datagridService.notificationSendFlowRemoved(flowId);
return true;
}
return false;
- */
- }
-
- /**
- * Clear the state for all previously added flows.
- *
- * @return true on success, otherwise false.
- */
- @Override
- public boolean clearAllFlows() {
- if (FlowDatabaseOperation.clearAllFlows(dbHandlerApi)) {
- datagridService.notificationSendAllFlowsRemoved();
- return true;
- }
- return false;
- }
-
- /**
- * Clear the state for a previously added flow.
- *
- * @param flowId the Flow ID of the flow to clear.
- * @return true on success, otherwise false.
- */
- @Override
- public boolean clearFlow(FlowId flowId) {
- if (FlowDatabaseOperation.clearFlow(dbHandlerApi, flowId)) {
- datagridService.notificationSendFlowRemoved(flowId);
- return true;
- }
- return false;
}
/**
@@ -715,22 +344,13 @@
* @return the Flow Paths if found, otherwise null.
*/
@Override
- public ArrayList<IFlowPath> getAllFlowsSummary(FlowId flowId,
- int maxFlows) {
+ public ArrayList<FlowPath> getAllFlowsSummary(FlowId flowId,
+ int maxFlows) {
return FlowDatabaseOperation.getAllFlowsSummary(dbHandlerApi, flowId,
maxFlows);
}
/**
- * Get all Flows information, without the associated Flow Entries.
- *
- * @return all Flows information, without the associated Flow Entries.
- */
- public ArrayList<IFlowPath> getAllFlowsWithoutFlowEntries() {
- return FlowDatabaseOperation.getAllFlowsWithoutFlowEntries(dbHandlerApi);
- }
-
- /**
* Add and maintain a shortest-path flow.
*
* NOTE: The Flow Path argument does NOT contain flow entries.
@@ -772,139 +392,13 @@
}
/**
- * Reconcile a flow.
+ * Inform the Flow Manager that a Flow Entry on switch expired.
*
- * @param flowObj the flow that needs to be reconciled.
- * @param newDataPath the new data path to use.
- * @return true on success, otherwise false.
+ * @param sw the switch the Flow Entry expired on.
+ * @param flowEntryId the Flow Entry ID of the expired Flow Entry.
*/
- private boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
- String flowIdStr = flowObj.getFlowId();
-
- //
- // Set the incoming port matching and the outgoing port output
- // actions for each flow entry.
- //
- int idx = 0;
- for (FlowEntry flowEntry : newDataPath.flowEntries()) {
- flowEntry.setFlowId(new FlowId(flowIdStr));
-
- // Mark the Flow Entry as not updated in the switch
- flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
- // Set the incoming port matching
- FlowEntryMatch flowEntryMatch = new FlowEntryMatch();
- flowEntry.setFlowEntryMatch(flowEntryMatch);
- flowEntryMatch.enableInPort(flowEntry.inPort());
-
- //
- // Set the actions
- //
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- // If the first Flow Entry, copy the Flow Path actions to it
- //
- if (idx == 0) {
- String actionsStr = flowObj.getActions();
- if (actionsStr != null) {
- FlowEntryActions flowActions = new FlowEntryActions(actionsStr);
- for (FlowEntryAction action : flowActions.actions())
- flowEntryActions.addAction(action);
- }
- }
- idx++;
- //
- // Add the outgoing port output action
- //
- FlowEntryAction flowEntryAction = new FlowEntryAction();
- flowEntryAction.setActionOutput(flowEntry.outPort());
- flowEntryActions.addAction(flowEntryAction);
- }
-
- //
- // Remove the old Flow Entries, and add the new Flow Entries
- //
- Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
- for (IFlowEntry flowEntryObj : flowEntries) {
- flowEntryObj.setUserState("FE_USER_DELETE");
- flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- }
- for (FlowEntry flowEntry : newDataPath.flowEntries()) {
- addFlowEntry(flowObj, flowEntry);
- }
-
- //
- // Set the Data Path Summary
- //
- String dataPathSummaryStr = newDataPath.dataPathSummary();
- flowObj.setDataPathSummary(dataPathSummaryStr);
-
- return true;
- }
-
- /**
- * Reconcile all flows in a set.
- *
- * @param flowObjSet the set of flows that need to be reconciliated.
- */
- private void reconcileFlows(Iterable<IFlowPath> flowObjSet) {
- if (! flowObjSet.iterator().hasNext())
- return;
- // TODO: Not implemented/used yet.
- }
-
- /**
- * Install a Flow Entry on a switch.
- *
- * @param mySwitch the switch to install the Flow Entry into.
- * @param flowObj the flow path object for the flow entry to install.
- * @param flowEntryObj the flow entry object to install.
- * @return true on success, otherwise false.
- */
- private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
- IFlowEntry flowEntryObj) {
- if (enableFlowPusher) {
- return pusher.add(mySwitch, flowObj, flowEntryObj);
- } else {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowObj, flowEntryObj);
- }
- }
-
- /**
- * Install a Flow Entry on a switch.
- *
- * @param mySwitch the switch to install the Flow Entry into.
- * @param flowPath the flow path for the flow entry to install.
- * @param flowEntry the flow entry to install.
- * @return true on success, otherwise false.
- */
- private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
- FlowEntry flowEntry) {
- if (enableFlowPusher) {
- return pusher.add(mySwitch, flowPath, flowEntry);
- } else {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowPath, flowEntry);
- }
- }
-
- /**
- * Remove a Flow Entry from a switch.
- *
- * @param mySwitch the switch to remove the Flow Entry from.
- * @param flowPath the flow path for the flow entry to remove.
- * @param flowEntry the flow entry to remove.
- * @return true on success, otherwise false.
- */
- private boolean removeFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
- FlowEntry flowEntry) {
- //
- // The installFlowEntry() method implements both installation
- // and removal of flow entries.
- //
- return (installFlowEntry(mySwitch, flowPath, flowEntry));
+ public void flowEntryOnSwitchExpired(IOFSwitch sw, FlowEntryId flowEntryId) {
+ // TODO: Not implemented yet
}
/**
@@ -917,17 +411,12 @@
*/
public void pushModifiedFlowEntriesToSwitches(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
- return;
-
if (modifiedFlowEntries.isEmpty())
return;
Map<Long, IOFSwitch> mySwitches = getMySwitches();
for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
- FlowPath flowPath = flowPair.flowPath;
FlowEntry flowEntry = flowPair.flowEntry;
IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
@@ -937,12 +426,12 @@
log.debug("Pushing Flow Entry To Switch: {}", flowEntry.toString());
//
- // Install the Flow Entry into the switch
+ // Push the Flow Entry into the switch
//
- if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
+ if (! pusher.add(mySwitch, flowEntry)) {
String logMsg = "Cannot install Flow Entry " +
flowEntry.flowEntryId() +
- " from Flow Path " + flowPath.flowId() +
+ " from Flow Path " + flowEntry.flowId() +
" on switch " + flowEntry.dpid();
log.error(logMsg);
continue;
@@ -963,10 +452,6 @@
*/
public void pushModifiedFlowEntriesToDatagrid(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
- return;
-
if (modifiedFlowEntries.isEmpty())
return;
@@ -975,6 +460,9 @@
for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
FlowEntry flowEntry = flowPair.flowEntry;
+ if (! flowEntry.isValidFlowEntryId())
+ continue;
+
IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
//
@@ -993,8 +481,6 @@
FlowEntryUserState.FE_USER_DELETE) {
continue;
}
- if (! flowEntry.isValidFlowEntryId())
- continue;
}
log.debug("Pushing Flow Entry To Datagrid: {}", flowEntry.toString());
@@ -1020,32 +506,84 @@
}
/**
- * Push Flow Entries to the Network MAP.
- *
- * NOTE: The Flow Entries are pushed only on the instance responsible
- * for the first switch. This is to avoid database errors when multiple
- * instances are writing Flow Entries for the same Flow Path.
- *
- * @param modifiedFlowEntries the collection of Flow Entries to push.
+ * Class to implement writing to the database in a separate thread.
*/
- public void pushModifiedFlowEntriesToDatabase(
- Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
+ class FlowDatabaseWriter extends Thread {
+ private FlowManager flowManager;
+ private BlockingQueue<FlowPath> blockingQueue;
+
+ /**
+ * Constructor.
+ *
+ * @param flowManager the Flow Manager to use.
+ * @param blockingQueue the blocking queue to use.
+ */
+ FlowDatabaseWriter(FlowManager flowManager,
+ BlockingQueue<FlowPath> blockingQueue) {
+ this.flowManager = flowManager;
+ this.blockingQueue = blockingQueue;
+ }
+
+ /**
+ * Run the thread.
+ */
+ @Override
+ public void run() {
+ //
+ // The main loop
+ //
+ Collection<FlowPath> collection = new LinkedList<FlowPath>();
+ try {
+ while (true) {
+ FlowPath flowPath = blockingQueue.take();
+ collection.add(flowPath);
+ blockingQueue.drainTo(collection);
+ flowManager.writeModifiedFlowPathsToDatabase(collection);
+ collection.clear();
+ }
+ } catch (Exception exception) {
+ log.debug("Exception writing to the Database: ", exception);
+ }
+ }
+ }
+
+ /**
+ * Push Flow Paths to the Network MAP.
+ *
+ * NOTE: The complete Flow Paths are pushed only on the instance
+ * responsible for the first switch. This is to avoid database errors
+ * when multiple instances are writing Flow Entries for the same Flow Path.
+ *
+ * @param modifiedFlowPaths the collection of Flow Paths to push.
+ */
+ void pushModifiedFlowPathsToDatabase(
+ Collection<FlowPath> modifiedFlowPaths) {
+ //
+ // We only add the Flow Paths to the Database Queue.
+ // The FlowDatabaseWriter thread is responsible for the actual writing.
+ //
+ flowPathsToDatabaseQueue.addAll(modifiedFlowPaths);
+ }
+
+ /**
+ * Write Flow Paths to the Network MAP.
+ *
+ * NOTE: The complete Flow Paths are pushed only on the instance
+ * responsible for the first switch. This is to avoid database errors
+ * when multiple instances are writing Flow Entries for the same Flow Path.
+ *
+ * @param modifiedFlowPaths the collection of Flow Paths to write.
+ */
+ private void writeModifiedFlowPathsToDatabase(
+ Collection<FlowPath> modifiedFlowPaths) {
+ if (modifiedFlowPaths.isEmpty())
return;
- if (modifiedFlowEntries.isEmpty())
- return;
+ FlowId dummyFlowId = new FlowId();
Map<Long, IOFSwitch> mySwitches = getMySwitches();
- for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
- FlowPath flowPath = flowPair.flowPath;
- FlowEntry flowEntry = flowPair.flowEntry;
-
- if (! flowEntry.isValidFlowEntryId())
- continue;
-
+ for (FlowPath flowPath : modifiedFlowPaths) {
//
// Push the changes only on the instance responsible for the
// first switch.
@@ -1055,69 +593,37 @@
if (mySrcSwitch == null)
continue;
- log.debug("Pushing Flow Entry To Database: {}", flowEntry.toString());
//
- // Write the Flow Entry to the Network Map
+ // Test whether all Flow Entries are valid
//
- // NOTE: We try a number of times, in case somehow some other
- // instances are writing at the same time.
- // Apparently, if other instances are writing at the same time
- // this will trigger an error.
- //
- for (int i = 0; i < 6; i++) {
- try {
- //
- // Find the Flow Path in the Network MAP.
- //
- // NOTE: The Flow Path might not be found if the Flow was
- // just removed by some other controller instance.
- //
- IFlowPath flowObj =
- dbHandlerInner.searchFlowPath(flowEntry.flowId());
- if (flowObj == null) {
- String logMsg = "Cannot find Network MAP entry for Flow Path " + flowEntry.flowId();
- log.error(logMsg);
- break;
- }
-
- // Write the Flow Entry
- switch (flowEntry.flowEntryUserState()) {
- case FE_USER_ADD:
- // FALLTHROUGH
- case FE_USER_MODIFY:
- if (addFlowEntry(flowObj, flowEntry) == null) {
- String logMsg = "Cannot write to Network MAP Flow Entry " +
- flowEntry.flowEntryId() +
- " from Flow Path " + flowEntry.flowId() +
- " on switch " + flowEntry.dpid();
- log.error(logMsg);
- }
- break;
- case FE_USER_DELETE:
- if (deleteFlowEntry(flowObj, flowEntry) == false) {
- String logMsg = "Cannot remove from Network MAP Flow Entry " +
- flowEntry.flowEntryId() +
- " from Flow Path " + flowEntry.flowId() +
- " on switch " + flowEntry.dpid();
- log.error(logMsg);
- }
- break;
- }
-
- // Commit to the database
- dbHandlerInner.commit();
- break; // Success
-
- } catch (Exception e) {
- log.debug("Exception writing Flow Entry to Network MAP: ", e);
- dbHandlerInner.rollback();
- // Wait a bit (random value [1ms, 20ms] and try again
- int delay = 1 + randomGenerator.nextInt() % 20;
- try {
- Thread.sleep(delay);
- } catch (Exception e0) {
- }
+ boolean allValid = true;
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntryUserState() ==
+ FlowEntryUserState.FE_USER_DELETE) {
+ continue;
}
+ if (! flowEntry.isValidFlowEntryId()) {
+ allValid = false;
+ break;
+ }
+ }
+ if (! allValid)
+ continue;
+
+ log.debug("Pushing Flow Path To Database: {}", flowPath.toString());
+
+ //
+ // Write the Flow Path to the Network Map
+ //
+ try {
+ if (! FlowDatabaseOperation.addFlow(this, dbHandlerInner,
+ flowPath, dummyFlowId)) {
+ String logMsg = "Cannot write to Network Map Flow Path " +
+ flowPath.flowId();
+ log.error(logMsg);
+ }
+ } catch (Exception e) {
+ log.error("Exception writing Flow Path to Network MAP: ", e);
}
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java
deleted file mode 100644
index 8bed120..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java
+++ /dev/null
@@ -1,689 +0,0 @@
-package net.onrc.onos.ofcontroller.flowmanager;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import net.floodlightcontroller.core.IOFSwitch;
-import net.floodlightcontroller.util.MACAddress;
-import net.floodlightcontroller.util.OFMessageDamper;
-
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
-import net.onrc.onos.ofcontroller.util.*;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
-
-import org.openflow.protocol.OFFlowMod;
-import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFPacketOut;
-import org.openflow.protocol.OFPort;
-import org.openflow.protocol.OFType;
-import org.openflow.protocol.action.*;
-import org.openflow.protocol.factory.BasicFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class for performing Flow-related operations on the Switch.
- */
-class FlowSwitchOperation {
- private final static Logger log = LoggerFactory.getLogger(FlowSwitchOperation.class);
- //
- // TODO: Values copied from elsewhere (class LearningSwitch).
- // The local copy should go away!
- //
- public static final short PRIORITY_DEFAULT = 100;
- public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
- public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
-
- // TODO add Pusher instance member
- //
-
- /**
- * Install a Flow Entry on a switch.
- *
- * @param messageFactory the OpenFlow message factory to use.
- * @param messageDamper the OpenFlow message damper to use.
- * @param mySwitch the switch to install the Flow Entry into.
- * @param flowObj the flow path object for the flow entry to install.
- * @param flowEntryObj the flow entry object to install.
- * @return true on success, otherwise false.
- */
- static boolean installFlowEntry(BasicFactory messageFactory,
- OFMessageDamper messageDamper,
- IOFSwitch mySwitch, IFlowPath flowObj,
- IFlowEntry flowEntryObj) {
- String flowEntryIdStr = flowEntryObj.getFlowEntryId();
- if (flowEntryIdStr == null)
- return false;
- FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
- String userState = flowEntryObj.getUserState();
- if (userState == null)
- return false;
-
- //
- // Create the Open Flow Flow Modification Entry to push
- //
- OFFlowMod fm = (OFFlowMod)messageFactory.getMessage(OFType.FLOW_MOD);
- long cookie = flowEntryId.value();
-
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (userState.equals("FE_USER_ADD")) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (userState.equals("FE_USER_MODIFY")) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (userState.equals("FE_USER_DELETE")) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntryId.toString(), userState);
- return false;
- }
-
- //
- // Fetch the match conditions.
- //
- // NOTE: The Flow matching conditions common for all Flow Entries are
- // used ONLY if a Flow Entry does NOT have the corresponding matching
- // condition set.
- //
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
-
- // Match the Incoming Port
- Short matchInPort = flowEntryObj.getMatchInPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- String matchSrcMac = flowEntryObj.getMatchSrcMac();
- if (matchSrcMac == null)
- matchSrcMac = flowObj.getMatchSrcMac();
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- String matchDstMac = flowEntryObj.getMatchDstMac();
- if (matchDstMac == null)
- matchDstMac = flowObj.getMatchDstMac();
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType == null)
- matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryObj.getMatchVlanId();
- if (matchVlanId == null)
- matchVlanId = flowObj.getMatchVlanId();
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
- if (matchVlanPriority == null)
- matchVlanPriority = flowObj.getMatchVlanPriority();
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net == null)
- matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
- }
-
- // Natch the Destination IPv4 Network prefix
- String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net == null)
- matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryObj.getMatchIpProto();
- if (matchIpProto == null)
- matchIpProto = flowObj.getMatchIpProto();
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryObj.getMatchIpToS();
- if (matchIpToS == null)
- matchIpToS = flowObj.getMatchIpToS();
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort == null)
- matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort == null)
- matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- //
- // Fetch the actions
- //
- Short actionOutputPort = null;
- List<OFAction> openFlowActions = new ArrayList<OFAction>();
- int actionsLen = 0;
- FlowEntryActions flowEntryActions = null;
- String actionsStr = flowEntryObj.getActions();
- if (actionsStr != null)
- flowEntryActions = new FlowEntryActions(actionsStr);
- else
- flowEntryActions = new FlowEntryActions();
- for (FlowEntryAction action : flowEntryActions.actions()) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort = actionOutput.port().value();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa =
- new OFActionOutput(actionOutput.port().value(),
- (short)0xffff);
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan() == true) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().value(),
- actionEnqueue.queueId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
- .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
- .setPriority(PRIORITY_DEFAULT)
- .setBufferId(OFPacketOut.BUFFER_ID_NONE)
- .setCookie(cookie)
- .setCommand(flowModCommand)
- .setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
- (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort != null)
- fm.setOutPort(actionOutputPort);
- }
-
- //
- // TODO: Set the following flag
- // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- // See method ForwardingBase::pushRoute()
- //
-
- //
- // Write the message to the switch
- //
- log.debug("MEASUREMENT: Installing flow entry " + userState +
- " into switch DPID: " +
- mySwitch.getStringId() +
- " flowEntryId: " + flowEntryId.toString() +
- " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
- " inPort: " + matchInPort + " outPort: " + actionOutputPort
- );
- try {
- messageDamper.write(mySwitch, fm, null);
- mySwitch.flush();
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- //
- flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
- } catch (IOException e) {
- log.error("Failure writing flow mod from network map", e);
- return false;
- }
-
- return true;
- }
-
- /**
- * Install a Flow Entry on a switch.
- *
- * @param messageFactory the OpenFlow message factory to use.
- * @maram messageDamper the OpenFlow message damper to use.
- * @param mySwitch the switch to install the Flow Entry into.
- * @param flowPath the flow path for the flow entry to install.
- * @param flowEntry the flow entry to install.
- * @return true on success, otherwise false.
- */
- static boolean installFlowEntry(BasicFactory messageFactory,
- OFMessageDamper messageDamper,
- IOFSwitch mySwitch, FlowPath flowPath,
- FlowEntry flowEntry) {
- //
- // Create the OpenFlow Flow Modification Entry to push
- //
- OFFlowMod fm = (OFFlowMod)messageFactory.getMessage(OFType.FLOW_MOD);
- long cookie = flowEntry.flowEntryId().value();
-
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntry.flowEntryId().toString(),
- flowEntry.flowEntryUserState());
- return false;
- }
-
- //
- // Fetch the match conditions.
- //
- // NOTE: The Flow matching conditions common for all Flow Entries are
- // used ONLY if a Flow Entry does NOT have the corresponding matching
- // condition set.
- //
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
- FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
- FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
-
- // Match the Incoming Port
- Port matchInPort = flowEntryMatch.inPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort.value());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- MACAddress matchSrcMac = flowEntryMatch.srcMac();
- if ((matchSrcMac == null) && (flowPathMatch != null)) {
- matchSrcMac = flowPathMatch.srcMac();
- }
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- MACAddress matchDstMac = flowEntryMatch.dstMac();
- if ((matchDstMac == null) && (flowPathMatch != null)) {
- matchDstMac = flowPathMatch.dstMac();
- }
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
- if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
- matchEthernetFrameType = flowPathMatch.ethernetFrameType();
- }
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryMatch.vlanId();
- if ((matchVlanId == null) && (flowPathMatch != null)) {
- matchVlanId = flowPathMatch.vlanId();
- }
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryMatch.vlanPriority();
- if ((matchVlanPriority == null) && (flowPathMatch != null)) {
- matchVlanPriority = flowPathMatch.vlanPriority();
- }
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
- if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
- matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
- }
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
- }
-
- // Natch the Destination IPv4 Network prefix
- IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
- if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
- matchDstIPv4Net = flowPathMatch.dstIPv4Net();
- }
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryMatch.ipProto();
- if ((matchIpProto == null) && (flowPathMatch != null)) {
- matchIpProto = flowPathMatch.ipProto();
- }
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryMatch.ipToS();
- if ((matchIpToS == null) && (flowPathMatch != null)) {
- matchIpToS = flowPathMatch.ipToS();
- }
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
- if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
- matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
- }
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
- if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
- matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
- }
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- //
- // Fetch the actions
- //
- Short actionOutputPort = null;
- List<OFAction> openFlowActions = new ArrayList<OFAction>();
- int actionsLen = 0;
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- for (FlowEntryAction action : flowEntryActions.actions()) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort = actionOutput.port().value();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa =
- new OFActionOutput(actionOutput.port().value(),
- (short)0xffff);
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan() == true) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().value(),
- actionEnqueue.queueId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
- .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
- .setPriority(PRIORITY_DEFAULT)
- .setBufferId(OFPacketOut.BUFFER_ID_NONE)
- .setCookie(cookie)
- .setCommand(flowModCommand)
- .setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
- (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort != null)
- fm.setOutPort(actionOutputPort);
- }
-
- //
- // TODO: Set the following flag
- // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- // See method ForwardingBase::pushRoute()
- //
-
- //
- // Write the message to the switch
- //
- log.debug("MEASUREMENT: Installing flow entry " +
- flowEntry.flowEntryUserState() +
- " into switch DPID: " +
- mySwitch.getStringId() +
- " flowEntryId: " + flowEntry.flowEntryId().toString() +
- " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
- " inPort: " + matchInPort + " outPort: " + actionOutputPort
- );
- try {
- messageDamper.write(mySwitch, fm, null);
- mySwitch.flush();
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- //
- // TODO: The FlowEntry Object in Titan should be set
- // to FE_SWITCH_UPDATED.
- //
- } catch (IOException e) {
- log.error("Failure writing flow mod from network map", e);
- return false;
- }
- return true;
- }
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index f39acb5..8d2b797 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -2,11 +2,12 @@
import java.util.ArrayList;
+import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.util.CallerId;
import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -42,21 +43,6 @@
boolean deleteFlow(FlowId flowId);
/**
- * Clear the state for all previously added flows.
- *
- * @return true on success, otherwise false.
- */
- boolean clearAllFlows();
-
- /**
- * Clear the state for a previously added flow.
- *
- * @param flowId the Flow ID of the flow to clear.
- * @return true on success, otherwise false.
- */
- boolean clearFlow(FlowId flowId);
-
- /**
* Get a previously added flow.
*
* @param flowId the Flow ID of the flow to get.
@@ -97,7 +83,7 @@
* @param maxFlows number of flows to return
* @return the Flow Paths if found, otherwise null.
*/
- ArrayList<IFlowPath> getAllFlowsSummary(FlowId flowId, int maxFlows);
+ ArrayList<FlowPath> getAllFlowsSummary(FlowId flowId, int maxFlows);
/**
* Add and maintain a shortest-path flow.
@@ -128,4 +114,12 @@
* @return unique flow ID
*/
public long getNextFlowEntryId();
+
+ /**
+ * Inform the Flow Manager that a Flow Entry on switch expired.
+ *
+ * @param sw the switch the Flow Entry expired on.
+ * @param flowEntryId the Flow Entry ID of the expired Flow Entry.
+ */
+ public void flowEntryOnSwitchExpired(IOFSwitch sw, FlowEntryId flowEntryId);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/ClearFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/ClearFlowResource.java
deleted file mode 100644
index b8942b9..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/ClearFlowResource.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package net.onrc.onos.ofcontroller.flowmanager.web;
-
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
-import net.onrc.onos.ofcontroller.util.FlowId;
-
-import org.restlet.resource.Get;
-import org.restlet.resource.ServerResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Flow Manager REST API implementation: Clear internal Flow state.
- *
- * The "{flow-id}" request attribute value can be either a specific Flow ID,
- * or the keyword "all" to clear all Flows:
- *
- * GET /wm/flow/clear/{flow-id}/json
- */
-public class ClearFlowResource extends ServerResource {
- protected final static Logger log = LoggerFactory.getLogger(ClearFlowResource.class);
-
- /**
- * Implement the API.
- *
- * @return true on success, otehrwise false.
- */
- @Get("json")
- public Boolean retrieve() {
- Boolean result = false;
-
- IFlowService flowService =
- (IFlowService)getContext().getAttributes().
- get(IFlowService.class.getCanonicalName());
-
- if (flowService == null) {
- log.debug("ONOS Flow Service not found");
- return result;
- }
-
- // Extract the arguments
- String flowIdStr = (String) getRequestAttributes().get("flow-id");
-
- // Process the request
- if (flowIdStr.equals("all")) {
- log.debug("Clear All Flows");
- result = flowService.clearAllFlows();
- } else {
- FlowId flowId = new FlowId(flowIdStr);
- log.debug("Clear Flow Id: " + flowIdStr);
- result = flowService.clearFlow(flowId);
- }
- return result;
- }
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
index e027270..81d26dd 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
@@ -18,7 +18,6 @@
Router router = new Router(context);
router.attach("/add/json", AddFlowResource.class);
router.attach("/add-shortest-path/json", AddShortestPathFlowResource.class);
- router.attach("/clear/{flow-id}/json", ClearFlowResource.class);
router.attach("/delete/{flow-id}/json", DeleteFlowResource.class);
router.attach("/get/{flow-id}/json", GetFlowByIdResource.class);
router.attach("/getall-by-installer-id/{installer-id}/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json", GetAllFlowsByInstallerIdResource.class);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
index 89e5b01..58f82a9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
@@ -2,8 +2,8 @@
import java.util.ArrayList;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowId;
import org.restlet.resource.Get;
@@ -31,8 +31,8 @@
* @return the collection of Flow states if any found, otherwise null.
*/
@Get("json")
- public ArrayList<IFlowPath> retrieve() {
- ArrayList<IFlowPath> result = null;
+ public ArrayList<FlowPath> retrieve() {
+ ArrayList<FlowPath> result = null;
FlowId flowId;
int maxFlows = 0;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index a59a9f9..461d231 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -4,26 +4,62 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+
+import org.openflow.protocol.OFFlowRemoved;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.restserver.IRestApiService;
+import net.onrc.onos.ofcontroller.flowprogrammer.web.FlowProgrammerWebRoutable;
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
-public class FlowProgrammer implements IFloodlightModule {
- private static final boolean enableFlowSync = false;
-
+/**
+ * FlowProgrammer is a module responsible to maintain flows installed to switches.
+ * FlowProgrammer consists of FlowPusher and FlowSynchronizer.
+ * FlowPusher manages the rate of installation, and FlowSynchronizer synchronizes
+ * flows between GraphDB and switches.
+ * FlowProgrammer also watch the event of addition/deletion of switches to
+ * start/stop synchronization. When a switch is added to network, FlowProgrammer
+ * immediately kicks synchronization to keep switch's flow table latest state.
+ * Adversely, when a switch is removed from network, FlowProgrammer immediately
+ * stops synchronization.
+ * @author Brian
+ *
+ */
+public class FlowProgrammer implements IFloodlightModule,
+ IOFMessageListener,
+ IOFSwitchListener {
+ @SuppressWarnings("unused")
+ // flag to enable FlowSynchronizer
+ private static final boolean enableFlowSync = false;
+ protected static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
protected volatile IFloodlightProviderService floodlightProvider;
+ protected volatile IControllerRegistryService registryService;
+ protected volatile IRestApiService restApi;
+ protected volatile IFlowService flowManager;
protected FlowPusher pusher;
private static final int NUM_PUSHER_THREAD = 1;
protected FlowSynchronizer synchronizer;
-
+
public FlowProgrammer() {
pusher = new FlowPusher(NUM_PUSHER_THREAD);
if (enableFlowSync) {
- synchronizer = new FlowSynchronizer();
+ synchronizer = new FlowSynchronizer();
}
}
@@ -31,18 +67,21 @@
public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
- pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
+ registryService = context.getServiceImpl(IControllerRegistryService.class);
+ restApi = context.getServiceImpl(IRestApiService.class);
+ flowManager = context.getServiceImpl(IFlowService.class);
+ pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
if (enableFlowSync) {
- synchronizer.init(context);
+ synchronizer.init(pusher);
}
}
@Override
public void startUp(FloodlightModuleContext context) {
+ restApi.addRestletRoutable(new FlowProgrammerWebRoutable());
pusher.start();
- if (enableFlowSync) {
- synchronizer.startUp(context);
- }
+ floodlightProvider.addOFMessageListener(OFType.FLOW_REMOVED, this);
+ floodlightProvider.addOFSwitchListener(this);
}
@Override
@@ -51,7 +90,7 @@
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFlowPusherService.class);
if (enableFlowSync) {
- l.add(IFlowSyncService.class);
+ l.add(IFlowSyncService.class);
}
return l;
}
@@ -64,7 +103,7 @@
IFloodlightService>();
m.put(IFlowPusherService.class, pusher);
if (enableFlowSync) {
- m.put(IFlowSyncService.class, synchronizer);
+ m.put(IFlowSyncService.class, synchronizer);
}
return m;
}
@@ -74,8 +113,65 @@
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFloodlightProviderService.class);
+ l.add(IRestApiService.class);
return l;
}
-
+ @Override
+ public String getName() {
+ // TODO Auto-generated method stub
+ return "FlowProgrammer";
+ }
+
+ @Override
+ public boolean isCallbackOrderingPrereq(OFType type, String name) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCallbackOrderingPostreq(OFType type, String name) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+ switch (msg.getType()) {
+ case FLOW_REMOVED:
+ OFFlowRemoved flowMsg = (OFFlowRemoved) msg;
+ log.debug("Got flow removed from "+ sw.getId() +": "+ flowMsg.getCookie());
+ FlowEntryId id = new FlowEntryId(flowMsg.getCookie());
+ flowManager.flowEntryOnSwitchExpired(sw, id);
+ break;
+ default:
+ break;
+ }
+
+ return Command.CONTINUE;
+ }
+
+ @Override
+ public void addedSwitch(IOFSwitch sw) {
+ log.debug("Switch added: {}", sw.getId());
+
+ if (enableFlowSync && registryService.hasControl(sw.getId())) {
+ synchronizer.synchronize(sw);
+ }
+ }
+
+ @Override
+ public void removedSwitch(IOFSwitch sw) {
+ log.debug("Switch removed: {}", sw.getId());
+
+ if (enableFlowSync) {
+ synchronizer.interrupt(sw);
+ }
+ }
+
+ @Override
+ public void switchPortChanged(Long switchId) {
+ // TODO Auto-generated method stub
+ }
+
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 532477a..438f478 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -5,9 +5,12 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
import org.openflow.protocol.*;
import org.openflow.protocol.action.*;
@@ -16,11 +19,14 @@
import org.slf4j.LoggerFactory;
import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
import net.onrc.onos.ofcontroller.util.FlowEntry;
@@ -28,17 +34,22 @@
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
-import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.IPv4Net;
import net.onrc.onos.ofcontroller.util.Port;
/**
- * FlowPusher intermediates FlowManager/FlowSynchronizer and switches to push OpenFlow
- * messages to switches in proper rate.
+ * FlowPusher is a implementation of FlowPusherService.
+ * FlowPusher assigns one message queue instance for each one switch.
+ * Number of message processing threads is configurable by constructor, and
+ * one thread can handle multiple message queues. Each queue will be assigned to
+ * a thread according to hash function defined by getHash().
+ * Each processing thread reads messages from queues and sends it to switches
+ * in round-robin. Processing thread also calculates rate of sending to suppress
+ * excessive message sending.
* @author Naoki Shiota
*
*/
-public class FlowPusher implements IFlowPusherService {
+public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
// NOTE: Below are moved from FlowManager.
@@ -48,10 +59,6 @@
protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
- // Interval of sleep when queue is empty
- protected static final long SLEEP_MILLI_SEC = 10;
- protected static final int SLEEP_NANO_SEC = 0;
-
// Number of messages sent to switch at once
protected static final int MAX_MESSAGE_SEND = 100;
@@ -64,15 +71,24 @@
SUSPENDED,
}
+ /**
+ * SwitchQueue represents message queue attached to a switch.
+ * This consists of queue itself and variables used for limiting sending rate.
+ * @author Naoki Shiota
+ *
+ */
@SuppressWarnings("serial")
private class SwitchQueue extends ArrayDeque<OFMessage> {
QueueState state;
- // Max rate of sending message (bytes/sec). 0 implies no limitation.
+ // Max rate of sending message (bytes/ms). 0 implies no limitation.
long max_rate = 0; // 0 indicates no limitation
long last_sent_time = 0;
long last_sent_size = 0;
+ // "To be deleted" flag
+ boolean toBeDeleted = false;
+
/**
* Check if sending rate is within the rate
* @param current Current time
@@ -84,15 +100,20 @@
return true;
}
- long rate = last_sent_size / (current - last_sent_time);
-
- if (rate < max_rate) {
- return true;
- } else {
+ if (current == last_sent_time) {
return false;
}
+
+ // Check if sufficient time (from aspect of rate) elapsed or not.
+ long rate = last_sent_size / (current - last_sent_time);
+ return (rate < max_rate);
}
+ /**
+ * Log time and size of last sent data.
+ * @param current Time to be sent.
+ * @param size Size of sent data (in bytes).
+ */
void logSentData(long current, long size) {
last_sent_time = current;
last_sent_size = size;
@@ -100,11 +121,17 @@
}
- private OFMessageDamper messageDamper;
+ private OFMessageDamper messageDamper = null;
+ private IThreadPoolService threadPool = null;
private FloodlightContext context = null;
private BasicFactory factory = null;
- private Map<Long, FlowPusherProcess> threadMap = null;
+
+ // Map of threads versus dpid
+ private Map<Long, FlowPusherThread> threadMap = null;
+ // Map of Future objects versus dpid and transaction ID.
+ private Map<Long, Map<Integer, OFBarrierReplyFuture>>
+ barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
private int number_thread = 1;
@@ -113,29 +140,35 @@
* @author Naoki Shiota
*
*/
- private class FlowPusherProcess implements Runnable {
+ private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
- = new HashMap<IOFSwitch,SwitchQueue>();
+ = new HashMap<IOFSwitch,SwitchQueue>();
- private boolean isStopped = false;
- private boolean isMsgAdded = false;
+ // reusable latch used for waiting for arrival of message
+ private Semaphore mutex = new Semaphore(0);
@Override
public void run() {
- log.debug("Begin Flow Pusher Process");
-
while (true) {
- Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
- synchronized (queues) {
- entries = queues.entrySet();
+ try {
+ // wait for message pushed to queue
+ mutex.acquire();
+ } catch (InterruptedException e) {
+ // not an error
+ log.debug("FlowPusherThread is interrupted");
+ return;
}
- // Set taint flag to false at this moment.
- isMsgAdded = false;
+ // for safety of concurrent access, copy all key objects
+ Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
+ synchronized (queues) {
+ for (IOFSwitch sw : queues.keySet()) {
+ keys.add(sw);
+ }
+ }
- for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
- IOFSwitch sw = entry.getKey();
- SwitchQueue queue = entry.getValue();
+ for (IOFSwitch sw : keys) {
+ SwitchQueue queue = queues.get(sw);
// Skip if queue is suspended
if (sw == null || queue == null ||
@@ -143,76 +176,102 @@
continue;
}
- // check sending rate and determine it to be sent or not
- long current_time = System.nanoTime();
- long size = 0;
-
synchronized (queue) {
- if (queue.isSendable(current_time)) {
- int i = 0;
- while (! queue.isEmpty()) {
- // Number of messages excess the limit
- if (++i >= MAX_MESSAGE_SEND) {
- // Messages remains in queue
- isMsgAdded = true;
- break;
- }
-
- OFMessage msg = queue.poll();
-
- // if need to send, call IOFSwitch#write()
- try {
- messageDamper.write(sw, msg, context);
- log.debug("Pusher sends message : {}", msg);
- size += msg.getLength();
- } catch (IOException e) {
- e.printStackTrace();
- log.error("Exception in sending message ({}) : {}", msg, e);
+ processQueue(sw, queue, MAX_MESSAGE_SEND);
+ if (queue.isEmpty()) {
+ // remove queue if flagged to be.
+ if (queue.toBeDeleted) {
+ synchronized (queues) {
+ queues.remove(sw);
}
}
- sw.flush();
- queue.logSentData(current_time, size);
+ } else {
+ // if some messages remains in queue, latch down
+ if (mutex.availablePermits() == 0) {
+ mutex.release();
+ }
}
}
}
-
- // sleep while all queues are empty
- while (! (isMsgAdded || isStopped)) {
+ }
+ }
+
+ /**
+ * Read messages from queue and send them to the switch.
+ * If number of messages excess the limit, stop sending messages.
+ * @param sw Switch to which messages will be sent.
+ * @param queue Queue of messages.
+ * @param max_msg Limitation of number of messages to be sent. If set to 0,
+ * all messages in queue will be sent.
+ */
+ private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
+ // check sending rate and determine it to be sent or not
+ long current_time = System.currentTimeMillis();
+ long size = 0;
+
+ if (queue.isSendable(current_time)) {
+ int i = 0;
+ while (! queue.isEmpty()) {
+ // Number of messages excess the limit
+ if (0 < max_msg && max_msg <= i) {
+ break;
+ }
+ ++i;
+
+ OFMessage msg = queue.poll();
try {
- Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
- } catch (InterruptedException e) {
+ messageDamper.write(sw, msg, context);
+// log.debug("Pusher sends message : {}", msg);
+ size += msg.getLength();
+ } catch (IOException e) {
e.printStackTrace();
- log.error("Thread.sleep failed");
+ log.error("Exception in sending message ({}) : {}", msg, e);
}
}
-
- log.debug("Exit sleep loop.");
-
- if (isStopped) {
- log.debug("Pusher Process finished.");
- return;
- }
-
+ sw.flush();
+ queue.logSentData(current_time, size);
}
}
}
+ /**
+ * Initialize object with one thread.
+ */
public FlowPusher() {
-
}
+ /**
+ * Initialize object with threads of given number.
+ * @param number_thread Number of threads to handle messages.
+ */
public FlowPusher(int number_thread) {
this.number_thread = number_thread;
}
- public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
+ /**
+ * Set parameters needed for sending messages.
+ * @param context FloodlightContext used for sending messages.
+ * If null, FlowPusher uses default context.
+ * @param modContext FloodlightModuleContext used for acquiring
+ * ThreadPoolService and registering MessageListener.
+ * @param factory Factory object to create OFMessage objects.
+ * @param damper Message damper used for sending messages.
+ * If null, FlowPusher creates its own damper object.
+ */
+ public void init(FloodlightContext context,
+ FloodlightModuleContext modContext,
+ BasicFactory factory,
+ OFMessageDamper damper) {
this.context = context;
this.factory = factory;
+ this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+ IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
+ flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
if (damper != null) {
messageDamper = damper;
} else {
- // use default value
+ // use default values
messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
EnumSet.of(OFType.FLOW_MOD),
OFMESSAGE_DAMPER_TIMEOUT);
@@ -228,20 +287,15 @@
return;
}
- threadMap = new HashMap<Long,FlowPusherProcess>();
+ threadMap = new HashMap<Long,FlowPusherThread>();
for (long i = 0; i < number_thread; ++i) {
- FlowPusherProcess runnable = new FlowPusherProcess();
- threadMap.put(i, runnable);
+ FlowPusherThread thread = new FlowPusherThread();
- Thread thread = new Thread(runnable);
+ threadMap.put(i, thread);
thread.start();
}
}
- /**
- * Suspend sending messages to switch.
- * @param sw
- */
@Override
public boolean suspend(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -259,9 +313,6 @@
}
}
- /**
- * Resume sending messages to switch.
- */
@Override
public boolean resume(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -273,15 +324,19 @@
synchronized (queue) {
if (queue.state == QueueState.SUSPENDED) {
queue.state = QueueState.READY;
+
+ // Latch down if queue is not empty
+ FlowPusherThread thread = getProcess(sw);
+ if (! queue.isEmpty() &&
+ thread.mutex.availablePermits() == 0) {
+ thread.mutex.release();
+ }
return true;
}
return false;
}
}
- /**
- * Check if given switch is suspended.
- */
@Override
public boolean isSuspended(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -302,18 +357,12 @@
return;
}
- for (FlowPusherProcess runnable : threadMap.values()) {
- if (! runnable.isStopped) {
- runnable.isStopped = true;
- }
+ for (FlowPusherThread t : threadMap.values()) {
+ t.interrupt();
}
}
- /**
- * Set sending rate to a switch.
- * @param sw Switch.
- * @param rate Rate in bytes/sec.
- */
+ @Override
public void setRate(IOFSwitch sw, long rate) {
SwitchQueue queue = getQueue(sw);
if (queue == null) {
@@ -321,360 +370,82 @@
}
if (rate > 0) {
+ log.debug("rate for {} is set to {}", sw.getId(), rate);
queue.max_rate = rate;
}
}
+
+ @Override
+ public boolean createQueue(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue != null) {
+ return false;
+ }
+
+ FlowPusherThread proc = getProcess(sw);
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ synchronized (proc.queues) {
+ proc.queues.put(sw, queue);
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw) {
+ return deleteQueue(sw, false);
+ }
- /**
- * Add OFMessage to the queue related to given switch.
- * @param sw Switch to which message is sent.
- * @param msg Message to be sent.
- * @return true if succeed.
- */
+ @Override
+ public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
+ FlowPusherThread proc = getProcess(sw);
+
+ if (forceStop) {
+ synchronized (proc.queues) {
+ SwitchQueue queue = proc.queues.remove(sw);
+ if (queue == null) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return false;
+ }
+ synchronized (queue) {
+ queue.toBeDeleted = true;
+ }
+ return true;
+ }
+ }
+
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
- FlowPusherProcess proc = getProcess(sw);
+ FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
+ // create queue at first addition of message
if (queue == null) {
- queue = new SwitchQueue();
- queue.state = QueueState.READY;
- synchronized (proc) {
- proc.queues.put(sw, queue);
- }
+ createQueue(sw);
+ queue = getQueue(sw);
}
synchronized (queue) {
queue.add(msg);
- log.debug("Message is pushed : {}", msg);
+// log.debug("Message is pushed : {}", msg);
}
- proc.isMsgAdded = true;
-
- return true;
- }
-
- /**
- * Create OFMessage from given flow information and add it to the queue.
- * @param sw Switch to which message is sent.
- * @param flowObj FlowPath.
- * @param flowEntryObj FlowEntry.
- * @return true if succeed.
- */
- @Override
- public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
- log.debug("sending : {}, {}", sw, flowObj);
- String flowEntryIdStr = flowEntryObj.getFlowEntryId();
- if (flowEntryIdStr == null)
- return false;
- FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
- String userState = flowEntryObj.getUserState();
- if (userState == null)
- return false;
-
- //
- // Create the Open Flow Flow Modification Entry to push
- //
- OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
- long cookie = flowEntryId.value();
-
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (userState.equals("FE_USER_ADD")) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (userState.equals("FE_USER_MODIFY")) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (userState.equals("FE_USER_DELETE")) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntryId.toString(), userState);
- return false;
+ if (proc.mutex.availablePermits() == 0) {
+ proc.mutex.release();
}
- //
- // Fetch the match conditions.
- //
- // NOTE: The Flow matching conditions common for all Flow Entries are
- // used ONLY if a Flow Entry does NOT have the corresponding matching
- // condition set.
- //
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
-
- // Match the Incoming Port
- Short matchInPort = flowEntryObj.getMatchInPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- String matchSrcMac = flowEntryObj.getMatchSrcMac();
- if (matchSrcMac == null)
- matchSrcMac = flowObj.getMatchSrcMac();
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- String matchDstMac = flowEntryObj.getMatchDstMac();
- if (matchDstMac == null)
- matchDstMac = flowObj.getMatchDstMac();
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType == null)
- matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryObj.getMatchVlanId();
- if (matchVlanId == null)
- matchVlanId = flowObj.getMatchVlanId();
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
- if (matchVlanPriority == null)
- matchVlanPriority = flowObj.getMatchVlanPriority();
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net == null)
- matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
- }
-
- // Match the Destination IPv4 Network prefix
- String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net == null)
- matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryObj.getMatchIpProto();
- if (matchIpProto == null)
- matchIpProto = flowObj.getMatchIpProto();
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryObj.getMatchIpToS();
- if (matchIpToS == null)
- matchIpToS = flowObj.getMatchIpToS();
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort == null)
- matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort == null)
- matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- //
- // Fetch the actions
- //
- Short actionOutputPort = null;
- List<OFAction> openFlowActions = new ArrayList<OFAction>();
- int actionsLen = 0;
- FlowEntryActions flowEntryActions = null;
- String actionsStr = flowEntryObj.getActions();
- if (actionsStr != null)
- flowEntryActions = new FlowEntryActions(actionsStr);
- else
- flowEntryActions = new FlowEntryActions();
- for (FlowEntryAction action : flowEntryActions.actions()) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort = actionOutput.port().value();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa =
- new OFActionOutput(actionOutput.port().value(),
- (short)0xffff);
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan() == true) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().value(),
- actionEnqueue.queueId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
- .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
- .setPriority(PRIORITY_DEFAULT)
- .setBufferId(OFPacketOut.BUFFER_ID_NONE)
- .setCookie(cookie)
- .setCommand(flowModCommand)
- .setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
- (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort != null)
- fm.setOutPort(actionOutputPort);
- }
-
- //
- // TODO: Set the following flag
- // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- // See method ForwardingBase::pushRoute()
- //
-
- //
- // Write the message to the switch
- //
- log.debug("MEASUREMENT: Installing flow entry " + userState +
- " into switch DPID: " +
- sw.getStringId() +
- " flowEntryId: " + flowEntryId.toString() +
- " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
- " inPort: " + matchInPort + " outPort: " + actionOutputPort
- );
- add(sw,fm);
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- //
- flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
-
return true;
}
- /**
- * Create OFMessage from given flow information and add it to the queue.
- * @param sw Switch to which message is sent.
- * @param flowPath FlowPath.
- * @param flowEntry FlowEntry.
- * @return true if secceed.
- */
@Override
- public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
+ public boolean add(IOFSwitch sw, FlowEntry flowEntry) {
//
// Create the OpenFlow Flow Modification Entry to push
//
@@ -706,7 +477,6 @@
//
OFMatch match = new OFMatch();
match.setWildcards(OFMatch.OFPFW_ALL);
- FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
// Match the Incoming Port
@@ -718,9 +488,6 @@
// Match the Source MAC address
MACAddress matchSrcMac = flowEntryMatch.srcMac();
- if ((matchSrcMac == null) && (flowPathMatch != null)) {
- matchSrcMac = flowPathMatch.srcMac();
- }
if (matchSrcMac != null) {
match.setDataLayerSource(matchSrcMac.toString());
match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
@@ -728,9 +495,6 @@
// Match the Destination MAC address
MACAddress matchDstMac = flowEntryMatch.dstMac();
- if ((matchDstMac == null) && (flowPathMatch != null)) {
- matchDstMac = flowPathMatch.dstMac();
- }
if (matchDstMac != null) {
match.setDataLayerDestination(matchDstMac.toString());
match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
@@ -738,9 +502,6 @@
// Match the Ethernet Frame Type
Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
- if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
- matchEthernetFrameType = flowPathMatch.ethernetFrameType();
- }
if (matchEthernetFrameType != null) {
match.setDataLayerType(matchEthernetFrameType);
match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
@@ -748,9 +509,6 @@
// Match the VLAN ID
Short matchVlanId = flowEntryMatch.vlanId();
- if ((matchVlanId == null) && (flowPathMatch != null)) {
- matchVlanId = flowPathMatch.vlanId();
- }
if (matchVlanId != null) {
match.setDataLayerVirtualLan(matchVlanId);
match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
@@ -758,9 +516,6 @@
// Match the VLAN priority
Byte matchVlanPriority = flowEntryMatch.vlanPriority();
- if ((matchVlanPriority == null) && (flowPathMatch != null)) {
- matchVlanPriority = flowPathMatch.vlanPriority();
- }
if (matchVlanPriority != null) {
match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
match.setWildcards(match.getWildcards()
@@ -769,27 +524,18 @@
// Match the Source IPv4 Network prefix
IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
- if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
- matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
- }
if (matchSrcIPv4Net != null) {
match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
}
// Natch the Destination IPv4 Network prefix
IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
- if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
- matchDstIPv4Net = flowPathMatch.dstIPv4Net();
- }
if (matchDstIPv4Net != null) {
match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
}
// Match the IP protocol
Byte matchIpProto = flowEntryMatch.ipProto();
- if ((matchIpProto == null) && (flowPathMatch != null)) {
- matchIpProto = flowPathMatch.ipProto();
- }
if (matchIpProto != null) {
match.setNetworkProtocol(matchIpProto);
match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
@@ -797,9 +543,6 @@
// Match the IP ToS (DSCP field, 6 bits)
Byte matchIpToS = flowEntryMatch.ipToS();
- if ((matchIpToS == null) && (flowPathMatch != null)) {
- matchIpToS = flowPathMatch.ipToS();
- }
if (matchIpToS != null) {
match.setNetworkTypeOfService(matchIpToS);
match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
@@ -807,9 +550,6 @@
// Match the Source TCP/UDP port
Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
- if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
- matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
- }
if (matchSrcTcpUdpPort != null) {
match.setTransportSource(matchSrcTcpUdpPort);
match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
@@ -817,9 +557,6 @@
// Match the Destination TCP/UDP port
Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
- if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
- matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
- }
if (matchDstTcpUdpPort != null) {
match.setTransportDestination(matchDstTcpUdpPort);
match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
@@ -965,7 +702,7 @@
//
// Write the message to the switch
//
- log.debug("MEASUREMENT: Installing flow entry "
+ log.debug("Installing flow entry "
+ flowEntry.flowEntryUserState() + " into switch DPID: "
+ sw.getStringId() + " flowEntryId: "
+ flowEntry.flowEntryId().toString() + " srcMac: "
@@ -984,8 +721,59 @@
return add(sw,fm);
}
+
+ @Override
+ public OFBarrierReply barrier(IOFSwitch sw) {
+ OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+ if (future == null) {
+ return null;
+ }
+
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.error("InterruptedException: {}", e);
+ return null;
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ log.error("ExecutionException: {}", e);
+ return null;
+ }
+ }
- private SwitchQueue getQueue(IOFSwitch sw) {
+ @Override
+ public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+ // TODO creation of message and future should be moved to OFSwitchImpl
+
+ if (sw == null) {
+ return null;
+ }
+
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+ msg.setXid(sw.getNextTransactionId());
+
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+ synchronized (barrierFutures) {
+ Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+ if (map == null) {
+ map = new HashMap<Integer,OFBarrierReplyFuture>();
+ barrierFutures.put(sw.getId(), map);
+ }
+ map.put(msg.getXid(), future);
+ }
+
+ add(sw, msg);
+
+ return future;
+ }
+
+ /**
+ * Get a queue attached to a switch.
+ * @param sw Switch object
+ * @return Queue object
+ */
+ protected SwitchQueue getQueue(IOFSwitch sw) {
if (sw == null) {
return null;
}
@@ -993,14 +781,61 @@
return getProcess(sw).queues.get(sw);
}
- private long getHash(IOFSwitch sw) {
- // TODO should consider equalization algorithm
+ /**
+ * Get a hash value correspondent to a switch.
+ * @param sw Switch object
+ * @return Hash value
+ */
+ protected long getHash(IOFSwitch sw) {
+ // This code assumes DPID is sequentially assigned.
+ // TODO consider equalization algorithm
return sw.getId() % number_thread;
}
-
- private FlowPusherProcess getProcess(IOFSwitch sw) {
+
+ /**
+ * Get a Thread object which processes the queue attached to a switch.
+ * @param sw Switch object
+ * @return Thread object
+ */
+ protected FlowPusherThread getProcess(IOFSwitch sw) {
long hash = getHash(sw);
return threadMap.get(hash);
}
+
+ @Override
+ public String getName() {
+ return "flowpusher";
+ }
+
+ @Override
+ public boolean isCallbackOrderingPrereq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public boolean isCallbackOrderingPostreq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+ Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+ if (map == null) {
+ log.debug("null map for {} : {}", sw.getId(), barrierFutures);
+ return Command.CONTINUE;
+ }
+
+ OFBarrierReplyFuture future = map.get(msg.getXid());
+ if (future == null) {
+ log.debug("null future for {} : {}", msg.getXid(), map);
+ return Command.CONTINUE;
+ }
+
+ log.debug("Received BARRIER_REPLY : {}", msg);
+ future.deliverFuture(sw, msg);
+
+ return Command.CONTINUE;
+ }
+
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index b2e4552..c357e7c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -2,7 +2,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -13,23 +12,8 @@
import org.openflow.protocol.OFFlowMod;
import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPacketOut;
import org.openflow.protocol.OFPort;
import org.openflow.protocol.OFStatisticsRequest;
-import org.openflow.protocol.action.OFAction;
-import org.openflow.protocol.action.OFActionDataLayerDestination;
-import org.openflow.protocol.action.OFActionDataLayerSource;
-import org.openflow.protocol.action.OFActionEnqueue;
-import org.openflow.protocol.action.OFActionNetworkLayerDestination;
-import org.openflow.protocol.action.OFActionNetworkLayerSource;
-import org.openflow.protocol.action.OFActionNetworkTypeOfService;
-import org.openflow.protocol.action.OFActionOutput;
-import org.openflow.protocol.action.OFActionStripVirtualLan;
-import org.openflow.protocol.action.OFActionTransportLayerDestination;
-import org.openflow.protocol.action.OFActionTransportLayerSource;
-import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
-import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
import org.openflow.protocol.statistics.OFFlowStatisticsReply;
import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
import org.openflow.protocol.statistics.OFStatistics;
@@ -37,107 +21,70 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.tinkerpop.blueprints.Direction;
-
-import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFSwitch;
-import net.floodlightcontroller.core.IOFSwitchListener;
-import net.floodlightcontroller.core.module.FloodlightModuleContext;
-import net.floodlightcontroller.core.module.FloodlightModuleException;
-import net.floodlightcontroller.core.module.IFloodlightModule;
-import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.restserver.IRestApiService;
-import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.graph.GraphDBOperation;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
-import net.onrc.onos.ofcontroller.core.module.IOnosService;
-import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
+import net.onrc.onos.ofcontroller.flowmanager.FlowDatabaseOperation;
import net.onrc.onos.ofcontroller.util.Dpid;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction;
-import net.onrc.onos.ofcontroller.util.FlowEntryActions;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionEnqueue;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionOutput;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetEthernetAddr;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIPv4Addr;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIpToS;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetTcpUdpPort;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanId;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanPriority;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionStripVlan;
-import net.onrc.onos.registry.controller.IControllerRegistryService;
-public class FlowSynchronizer implements IFlowSyncService, IOFSwitchListener {
+/**
+ * FlowSynchronizer is an implementation of FlowSyncService.
+ * In addition to IFlowSyncService, FlowSynchronizer periodically reads flow
+ * tables from switches and compare them with GraphDB to drop unnecessary
+ * flows and/or to install missing flows.
+ * @author Brian
+ *
+ */
+public class FlowSynchronizer implements IFlowSyncService {
- protected static Logger log = LoggerFactory.getLogger(FlowSynchronizer.class);
- protected IFloodlightProviderService floodlightProvider;
- protected IControllerRegistryService registryService;
- protected IFlowPusherService pusher;
+ private static Logger log = LoggerFactory.getLogger(FlowSynchronizer.class);
private GraphDBOperation dbHandler;
- private Map<IOFSwitch, Thread> switchThread = new HashMap<IOFSwitch, Thread>();
+ protected IFlowPusherService pusher;
+ private Map<IOFSwitch, Thread> switchThreads;
public FlowSynchronizer() {
dbHandler = new GraphDBOperation("");
+ switchThreads = new HashMap<IOFSwitch, Thread>();
}
+ @Override
public void synchronize(IOFSwitch sw) {
- Synchroizer sync = new Synchroizer(sw);
+ Synchronizer sync = new Synchronizer(sw);
Thread t = new Thread(sync);
+ switchThreads.put(sw, t);
t.start();
- switchThread.put(sw, t);
}
-
+
@Override
- public void addedSwitch(IOFSwitch sw) {
- log.debug("Switch added: {}", sw.getId());
-
- if (registryService.hasControl(sw.getId())) {
- synchronize(sw);
- }
- }
-
- @Override
- public void removedSwitch(IOFSwitch sw) {
- log.debug("Switch removed: {}", sw.getId());
-
- Thread t = switchThread.remove(sw);
+ public void interrupt(IOFSwitch sw) {
+ Thread t = switchThreads.remove(sw);
if(t != null) {
t.interrupt();
- }
-
+ }
}
- @Override
- public void switchPortChanged(Long switchId) {
- // TODO Auto-generated method stub
+ /**
+ * Initialize Synchronizer.
+ * @param pusherService FlowPusherService used for sending messages.
+ */
+ public void init(IFlowPusherService pusherService) {
+ pusher = pusherService;
}
- @Override
- public String getName() {
- return "FlowSynchronizer";
- }
-
- //@Override
- public void init(FloodlightModuleContext context)
- throws FloodlightModuleException {
- floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
- registryService = context.getServiceImpl(IControllerRegistryService.class);
- pusher = context.getServiceImpl(IFlowPusherService.class);
- }
-
- //@Override
- public void startUp(FloodlightModuleContext context) {
- floodlightProvider.addOFSwitchListener(this);
- }
-
- protected class Synchroizer implements Runnable {
+ /**
+ * Synchronizer represents main thread of synchronization.
+ * @author Brian
+ *
+ */
+ protected class Synchronizer implements Runnable {
IOFSwitch sw;
ISwitchObject swObj;
- public Synchroizer(IOFSwitch sw) {
+ public Synchronizer(IOFSwitch sw) {
this.sw = sw;
Dpid dpid = new Dpid(sw.getId());
this.swObj = dbHandler.searchSwitch(dpid.toString());
@@ -145,11 +92,21 @@
@Override
public void run() {
+ // TODO: stop adding other flow entries while synchronizing
+ //pusher.suspend(sw);
Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
Set<FlowEntryWrapper> switchEntries = getFlowEntriesFromSwitch();
compare(graphEntries, switchEntries);
+ //pusher.resume(sw);
}
+ /**
+ * Compare flows entries in GraphDB and switch to pick up necessary
+ * messages.
+ * After picking up, picked messages are added to FlowPusher.
+ * @param graphEntries Flow entries in GraphDB.
+ * @param switchEntries Flow entries in switch.
+ */
private void compare(Set<FlowEntryWrapper> graphEntries, Set<FlowEntryWrapper> switchEntries) {
int added = 0, removed = 0, skipped = 0;
for(FlowEntryWrapper entry : switchEntries) {
@@ -173,6 +130,10 @@
"Flow entries skipped " + skipped);
}
+ /**
+ * Read GraphDB to get FlowEntries associated with a switch.
+ * @return set of FlowEntries
+ */
private Set<FlowEntryWrapper> getFlowEntriesFromGraph() {
Set<FlowEntryWrapper> entries = new HashSet<FlowEntryWrapper>();
for(IFlowEntry entry : swObj.getFlowEntries()) {
@@ -182,6 +143,10 @@
return entries;
}
+ /**
+ * Read flow table from switch and derive FlowEntries from table.
+ * @return set of FlowEntries
+ */
private Set<FlowEntryWrapper> getFlowEntriesFromSwitch() {
int lengthU = 0;
@@ -228,48 +193,83 @@
}
+ /**
+ * FlowEntryWrapper represents abstract FlowEntry which is embodied
+ * by FlowEntryId (from GraphDB) or OFFlowStatisticsReply (from switch).
+ * @author Brian
+ *
+ */
class FlowEntryWrapper {
- FlowEntryId id;
- IFlowEntry iflowEntry;
+ FlowEntryId flowEntryId;
OFFlowStatisticsReply statisticsReply;
public FlowEntryWrapper(IFlowEntry entry) {
- iflowEntry = entry;
- id = new FlowEntryId(entry.getFlowEntryId());
+ flowEntryId = new FlowEntryId(entry.getFlowEntryId());
}
public FlowEntryWrapper(OFFlowStatisticsReply entry) {
+ flowEntryId = new FlowEntryId(entry.getCookie());
statisticsReply = entry;
- id = new FlowEntryId(entry.getCookie());
}
+ /**
+ * Install this FlowEntry to a switch via FlowPusher.
+ * @param sw Switch to which flow will be installed.
+ */
public void addToSwitch(IOFSwitch sw) {
- if(iflowEntry != null) {
- pusher.add(sw, iflowEntry.getFlow(), iflowEntry);
- }
- else if(statisticsReply != null) {
- log.error("Adding existing flow entry {} to sw {}",
+ if (statisticsReply != null) {
+ log.error("Error adding existing flow entry {} to sw {}",
statisticsReply.getCookie(), sw.getId());
+ return;
}
+
+ // Get the Flow Entry state from the Network Graph
+ IFlowEntry iFlowEntry = null;
+ try {
+ iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
+ } catch (Exception e) {
+ log.error("Error finding flow entry {} in Network Graph",
+ flowEntryId);
+ return;
+ }
+ if (iFlowEntry == null) {
+ log.error("Cannot add flow entry {} to sw {} : flow entry not found",
+ flowEntryId, sw.getId());
+ return;
+ }
+
+ FlowEntry flowEntry =
+ FlowDatabaseOperation.extractFlowEntry(iFlowEntry);
+ if (flowEntry == null) {
+ log.error("Cannot add flow entry {} to sw {} : flow entry cannot be extracted",
+ flowEntryId, sw.getId());
+ return;
+ }
+
+ pusher.add(sw, flowEntry);
}
- public void removeFromSwitch(IOFSwitch sw){
- if(iflowEntry != null) {
- log.error("Removing non-existent flow entry {} from sw {}",
- iflowEntry.getFlowEntryId(), sw.getId());
+ /**
+ * Remove this FlowEntry from a switch via FlowPusher.
+ * @param sw Switch from which flow will be removed.
+ */
+ public void removeFromSwitch(IOFSwitch sw) {
+ if (statisticsReply == null) {
+ log.error("Error removing non-existent flow entry {} from sw {}",
+ flowEntryId, sw.getId());
+ return;
+ }
- }
- else if(statisticsReply != null) {
- // Convert Statistics Reply to Flow Mod, then write it
- OFFlowMod fm = new OFFlowMod();
- fm.setCookie(statisticsReply.getCookie());
- fm.setCommand(OFFlowMod.OFPFC_DELETE_STRICT);
- fm.setLengthU(OFFlowMod.MINIMUM_LENGTH);
- fm.setMatch(statisticsReply.getMatch());
- fm.setPriority(statisticsReply.getPriority());
- fm.setOutPort(OFPort.OFPP_NONE);
- pusher.add(sw, fm);
- }
+ // Convert Statistics Reply to Flow Mod, then write it
+ OFFlowMod fm = new OFFlowMod();
+ fm.setCookie(statisticsReply.getCookie());
+ fm.setCommand(OFFlowMod.OFPFC_DELETE_STRICT);
+ fm.setLengthU(OFFlowMod.MINIMUM_LENGTH);
+ fm.setMatch(statisticsReply.getMatch());
+ fm.setPriority(statisticsReply.getPriority());
+ fm.setOutPort(OFPort.OFPP_NONE);
+
+ pusher.add(sw, fm);
}
/**
@@ -277,7 +277,7 @@
*/
@Override
public int hashCode() {
- return id.hashCode();
+ return flowEntryId.hashCode();
}
/**
@@ -285,22 +285,21 @@
* the same value; otherwise, returns false.
*
* @param Object to compare
+ * @return true if the object has the same Flow Entry ID.
*/
@Override
public boolean equals(Object obj){
if(obj.getClass() == this.getClass()) {
FlowEntryWrapper entry = (FlowEntryWrapper) obj;
// TODO: we need to actually compare the match + actions
- return this.id.equals(entry.id);
+ return this.flowEntryId.equals(entry.flowEntryId);
}
return false;
}
@Override
public String toString() {
- return id.toString();
+ return flowEntryId.toString();
}
}
}
-
-
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index e16dd20..2f550a7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -1,43 +1,105 @@
package net.onrc.onos.ofcontroller.flowprogrammer;
-import net.floodlightcontroller.core.IOFSwitch;
-import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
-import net.onrc.onos.ofcontroller.util.FlowEntry;
-import net.onrc.onos.ofcontroller.util.FlowPath;
-
+import org.openflow.protocol.OFBarrierReply;
import org.openflow.protocol.OFMessage;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+
+/**
+ * FlowPusherService is a service to send message to switches in proper rate.
+ * Conceptually a queue is attached to each switch, and FlowPusherService
+ * read a message from queue and send it to switch in order.
+ * To guarantee message has been installed, FlowPusherService can add barrier
+ * message to queue and can notify when barrier message is sent to switch.
+ * @author Naoki Shiota
+ *
+ */
public interface IFlowPusherService extends IFloodlightService {
/**
- * Add a message to the queue of a switch.
- * @param sw
- * @param msg
- * @return
+ * Create a queue correspondent to the switch.
+ * @param sw Switch to which new queue is attached.
+ * @return true if new queue is successfully created.
+ */
+ boolean createQueue(IOFSwitch sw);
+
+ /**
+ * Delete a queue correspondent to the switch.
+ * Messages remains in queue will be all sent before queue is deleted.
+ * @param sw Switch of which queue is deleted.
+ * @return true if queue is successfully deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw);
+
+ /**
+ * Delete a queue correspondent to the switch.
+ * By setting force flag on, queue will be deleted immediately.
+ * @param sw Switch of which queue is deleted.
+ * @param forceStop If this flag is set to true, queue will be deleted
+ * immediately regardless of any messages in the queue.
+ * If false, all messages will be sent to switch and queue will
+ * be deleted after that.
+ * @return true if queue is successfully deleted or flagged to be deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw, boolean forceStop);
+
+ /**
+ * Add a message to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param msg Message object to be added.
+ * @return true if message is successfully added to a queue.
*/
boolean add(IOFSwitch sw, OFMessage msg);
- boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry);
- boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
+ boolean add(IOFSwitch sw, FlowEntry flowEntry);
+
+ /**
+ * Set sending rate to a switch.
+ * @param sw Switch.
+ * @param rate Rate in bytes/ms.
+ */
+ public void setRate(IOFSwitch sw, long rate);
+
+ /**
+ * Add BARRIER message to queue and wait for reply.
+ * @param sw Switch to which barrier message is pushed.
+ * @return BARRIER_REPLY message sent from switch.
+ */
+ OFBarrierReply barrier(IOFSwitch sw);
+
+ /**
+ * Add BARRIER message to queue asynchronously.
+ * @param sw Switch to which barrier message is pushed.
+ * @return Future object of BARRIER_REPLY message which will be sent from switch.
+ */
+ OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
/**
* Suspend pushing message to a switch.
- * @param sw
+ * @param sw Switch to be suspended pushing message.
* @return true if success
*/
boolean suspend(IOFSwitch sw);
/**
* Resume pushing message to a switch.
- * @param sw
+ * @param sw Switch to be resumed pushing message.
* @return true if success
*/
boolean resume(IOFSwitch sw);
/**
* Get whether pushing of message is suspended or not.
- * @param sw
- * @return true if suspended
+ * @param sw Switch to be checked.
+ * @return true if suspended.
*/
boolean isSuspended(IOFSwitch sw);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
index 1e71f6a..4e6efaf 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
@@ -5,9 +5,13 @@
import net.floodlightcontroller.core.module.IFloodlightService;
/**
- * @author bocon
+ * FlowSyncService is a service to synchronize GraphDB and switch's flow table.
+ * FlowSyncService offers APIs to trigger and interrupt synchronization explicitly.
+ * @author Brian
*
*/
public interface IFlowSyncService extends IFloodlightService {
public void synchronize(IOFSwitch sw);
+
+ public void interrupt(IOFSwitch sw);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
new file mode 100644
index 0000000..3013f5a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
@@ -0,0 +1,49 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import java.util.concurrent.TimeUnit;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+public class OFBarrierReplyFuture extends OFMessageFuture<OFBarrierReply> {
+
+ protected volatile boolean finished;
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId);
+ init();
+ }
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId, timeout, unit);
+ init();
+ }
+
+ private void init() {
+ this.finished = false;
+ this.result = null;
+ }
+
+ @Override
+ protected void handleReply(IOFSwitch sw, OFMessage msg) {
+ this.result = (OFBarrierReply) msg;
+ this.finished = true;
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ protected void unRegister() {
+ super.unRegister();
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
new file mode 100644
index 0000000..00d7fc9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
@@ -0,0 +1,26 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import org.restlet.Context;
+import org.restlet.Restlet;
+import org.restlet.routing.Router;
+
+import net.floodlightcontroller.restserver.RestletRoutable;
+
+public class FlowProgrammerWebRoutable implements RestletRoutable {
+
+ @Override
+ public Restlet getRestlet(Context context) {
+ Router router = new Router(context);
+ router.attach("/pusher/setrate/{dpid}/{rate}/json", SetPushRateResource.class);
+ router.attach("/pusher/suspend/{dpid}/json", SuspendPusherResource.class);
+ router.attach("/pusher/resume/{dpid}/json", ResumePusherResource.class);
+ router.attach("/pusher/barrier/{dpid}/json", SendBarrierResource.class);
+ return router;
+ }
+
+ @Override
+ public String basePath() {
+ return "/wm/fprog";
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java
new file mode 100644
index 0000000..4e1c0fc
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java
@@ -0,0 +1,33 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
+
+public class PusherResource extends ServerResource {
+ protected final static Logger log = LoggerFactory.getLogger(PusherResource.class);
+
+ protected IFloodlightProviderService provider;
+ protected IFlowPusherService pusher;
+
+ protected boolean init() {
+ provider = (IFloodlightProviderService)
+ getContext().getAttributes().
+ get(IFloodlightProviderService.class.getCanonicalName());
+ if (provider == null) {
+ log.debug("ONOS FloodlightProvider not found");
+ return false;
+ }
+
+ pusher = (IFlowPusherService)getContext().getAttributes().
+ get(IFlowPusherService.class.getCanonicalName());
+ if (pusher == null) {
+ log.debug("ONOS FlowPusherService not found");
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
new file mode 100644
index 0000000..dcbe3e9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Resume sending message to switch.
+ *
+ * GET /wm/fprog/pusher/resume/{dpid}/json"
+ */
+public class ResumePusherResource extends PusherResource {
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (! init()) {
+ return false;
+ }
+
+ long dpid;
+ try {
+ dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ return pusher.resume(sw);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
new file mode 100644
index 0000000..33b666a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Send barrier message to switch.
+ *
+ * GET /wm/fprog/pusher/barrier/{dpid}/json"
+ */
+public class SendBarrierResource extends PusherResource {
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public OFBarrierReply retrieve() {
+ if (! init()) {
+ return null;
+ }
+ long dpid;
+ try {
+ dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return null;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return null;
+ }
+
+ return pusher.barrier(sw);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
new file mode 100644
index 0000000..9431d65
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
@@ -0,0 +1,47 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Set sending rate to the switch.
+ *
+ * GET /wm/fprog/pusher/setrate/{dpid}/{rate}/json"
+ */
+public class SetPushRateResource extends PusherResource {
+
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (! init()) {
+ return false;
+ }
+
+ long dpid;
+ long rate;
+
+ try {
+ dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+ rate = Long.valueOf((String)getRequestAttributes().get("rate"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ pusher.setRate(sw, rate);
+
+ return true;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
new file mode 100644
index 0000000..1a5266b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
@@ -0,0 +1,46 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FlowProgrammer REST API implementation: Suspend sending message to switch.
+ *
+ * GET /wm/fprog/pusher/suspend/{dpid}/json"
+ */
+public class SuspendPusherResource extends PusherResource {
+
+ protected final static Logger log = LoggerFactory.getLogger(SetPushRateResource.class);
+
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (! init()) {
+ return false;
+ }
+
+ long dpid;
+ try {
+ dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ return pusher.suspend(sw);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntry.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntry.java
index 15a6233..98dbd88 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntry.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntry.java
@@ -136,10 +136,11 @@
*
* @return true if the Flow ID is valid, otherwise false.
*/
+ @JsonIgnore
public boolean isValidFlowId() {
if (this.flowId == null)
return false;
- return (this.flowId.value() != 0);
+ return (this.flowId.isValid());
}
/**
@@ -165,10 +166,11 @@
*
* @return true if the Flow Entry ID is valid, otherwise false.
*/
+ @JsonIgnore
public boolean isValidFlowEntryId() {
if (this.flowEntryId == null)
return false;
- return (this.flowEntryId.value() != 0);
+ return (this.flowEntryId.isValid());
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryActions.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryActions.java
index 53aab66..7d9688b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryActions.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryActions.java
@@ -2,6 +2,7 @@
import java.util.ArrayList;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
/**
@@ -79,6 +80,7 @@
*
* @return true if the set of actions is empty, otherwise false.
*/
+ @JsonIgnore
public Boolean isEmpty() {
return actions.isEmpty();
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java
index 29efe4c..f5728b0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java
@@ -5,6 +5,7 @@
import net.onrc.onos.ofcontroller.util.serializers.FlowEntryIdDeserializer;
import net.onrc.onos.ofcontroller.util.serializers.FlowEntryIdSerializer;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.map.annotate.JsonDeserialize;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -20,7 +21,7 @@
* Default constructor.
*/
public FlowEntryId() {
- this.value = 0;
+ this.value = -1;
}
/**
@@ -66,7 +67,17 @@
public void setValue(long value) {
this.value = value;
}
-
+
+ /**
+ * Test whether the Flow Entry ID is valid.
+ *
+ * @return true if the Flow Entry ID is valid, otherwise false.
+ */
+ @JsonIgnore
+ public boolean isValid() {
+ return (this.value() != -1);
+ }
+
/**
* Returns true of the object is another Flow Entry ID with
* the same value; otherwise, returns false.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowId.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowId.java
index de955ba..d90e96f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowId.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowId.java
@@ -5,6 +5,7 @@
import net.onrc.onos.ofcontroller.util.serializers.FlowIdDeserializer;
import net.onrc.onos.ofcontroller.util.serializers.FlowIdSerializer;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.map.annotate.JsonDeserialize;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -20,7 +21,7 @@
* Default constructor.
*/
public FlowId() {
- this.value = 0;
+ this.value = -1;
}
/**
@@ -68,6 +69,16 @@
}
/**
+ * Test whether the Flow ID is valid.
+ *
+ * @return true if the Flow ID is valid, otherwise false.
+ */
+ @JsonIgnore
+ public boolean isValid() {
+ return (this.value() != -1);
+ }
+
+ /**
* Convert the Flow ID value to a hexadecimal string.
*
* @return the Flow ID value to a hexadecimal string.
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
index 7fd0f67..8a727d3 100644
--- a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
@@ -239,49 +239,48 @@
verifyAll();
assertTrue(result);
}
-
+
/**
- * Test method for {@link FlowManager#deleteAllFlows()}.
- * @throws Exception
+ * Test method for {@link FlowManager#deleteFlow(FlowId)}.
+ * @throws Exception
*/
@Test
- public final void testDeleteAllFlowsSuccessNormally() throws Exception {
+ public final void testDeleteFlowSuccessNormally() throws Exception {
// create mock objects
- IFlowPath flowPath1 = createNiceMock(IFlowPath.class);
- IFlowPath flowPath2 = createNiceMock(IFlowPath.class);
+ IFlowPath flowPath = createIFlowPathMock(123, "id", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 2, 3, 4);
+ IFlowEntry flowEntry1 = createMock(IFlowEntry.class);
+ IFlowEntry flowEntry2 = createMock(IFlowEntry.class);
+ IFlowEntry flowEntry3 = createMock(IFlowEntry.class);
// instantiate required objects
- ArrayList<IFlowPath> flowPaths = new ArrayList<IFlowPath>();
- flowPaths.add(flowPath1);
- flowPaths.add(flowPath2);
FlowManager fm = new FlowManager();
-
+ FlowId flowId = new FlowId(123);
+ ArrayList<IFlowEntry> flowEntries = new ArrayList<IFlowEntry>();
+ flowEntries.add(flowEntry1);
+ flowEntries.add(flowEntry2);
+ flowEntries.add(flowEntry3);
+
// setup expectations
expectInitWithContext();
- expect(op.getAllFlowPaths()).andReturn(flowPaths);
-
- expect(flowPath1.getFlowId()).andReturn("1").anyTimes();
- expect(op.searchFlowPath(cmpEq(new FlowId(1)))).andReturn(flowPath1);
- expect(flowPath1.getFlowEntries()).andReturn(new ArrayList<IFlowEntry>());
- op.removeFlowPath(flowPath1);
-
- expect(flowPath2.getFlowId()).andReturn("2").anyTimes();
- expect(op.searchFlowPath(cmpEq(new FlowId(2)))).andReturn(flowPath2);
- expect(flowPath2.getFlowEntries()).andReturn(new ArrayList<IFlowEntry>());
- op.removeFlowPath(flowPath2);
-
+ expect(op.searchFlowPath(cmpEq(flowId))).andReturn(flowPath);
+ expect(flowPath.getFlowEntries()).andReturn(flowEntries);
+ flowPath.removeFlowEntry(flowEntry1);
+ flowPath.removeFlowEntry(flowEntry2);
+ flowPath.removeFlowEntry(flowEntry3);
+ op.removeFlowEntry(flowEntry1);
+ op.removeFlowEntry(flowEntry2);
+ op.removeFlowEntry(flowEntry3);
+ op.removeFlowPath(flowPath);
op.commit();
- expectLastCall().anyTimes();
// start the test
replayAll();
fm.init(context);
- Boolean result = fm.deleteAllFlows();
+ fm.deleteFlow(flowId);
// verify the test
verifyAll();
- assertTrue(result);
}
/**
@@ -316,16 +315,16 @@
}
/**
- * Test method for {@link FlowManager#clearAllFlows()}.
+ * Test method for {@link FlowManager#deleteAllFlows()}.
* @throws Exception
*/
@Test
- public final void testClearAllFlowsSuccessNormally() throws Exception {
+ public final void testDeleteAllFlowsSuccessNormally() throws Exception {
// create mock objects
IFlowPath flowPath1 = createNiceMock(IFlowPath.class);
IFlowPath flowPath2 = createNiceMock(IFlowPath.class);
IFlowPath flowPath3 = createNiceMock(IFlowPath.class);
- FlowManager fm = createPartialMockAndInvokeDefaultConstructor(FlowManager.class, "clearFlow");
+ FlowManager fm = createPartialMockAndInvokeDefaultConstructor(FlowManager.class, "deleteFlow");
// instantiate required objects
ArrayList<IFlowPath> flowPaths = new ArrayList<IFlowPath>();
@@ -340,16 +339,16 @@
expect(flowPath1.getFlowId()).andReturn(new FlowId(1).toString());
expect(flowPath2.getFlowId()).andReturn(null);
expect(flowPath3.getFlowId()).andReturn(new FlowId(3).toString());
- expect(fm.clearFlow(cmpEq(new FlowId(1)))).andReturn(true);
- expect(fm.clearFlow(cmpEq(new FlowId(3)))).andReturn(true);
+ expect(fm.deleteFlow(cmpEq(new FlowId(1)))).andReturn(true);
+ expect(fm.deleteFlow(cmpEq(new FlowId(3)))).andReturn(true);
// start the test
replayAll();
fm.init(context);
- Boolean result = fm.clearAllFlows();
+ Boolean result = fm.deleteAllFlows();
- //verify the test
+ // verify the test
verifyAll();
assertTrue(result);
}
@@ -464,35 +463,35 @@
*/
@Test
public final void testGetAllFlowsSummarySuccessNormally() throws Exception {
- final String getAllFlowsWithoutFlowEntries = "getAllFlowsWithoutFlowEntries";
+ final String getAllFlowsWithDataPathSummary = "getAllFlowsWithDataPathSummary";
// create mock objects
- FlowManager fm = createPartialMockAndInvokeDefaultConstructor(FlowManager.class, getAllFlowsWithoutFlowEntries);
- IFlowPath flowPath1 = createIFlowPathMock(1, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 2, 3, 4);
- IFlowPath flowPath2 = createIFlowPathMock(5, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 2, 3, 4, 5);
- IFlowPath flowPath3 = createIFlowPathMock(10, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 3, 4, 5, 6);
+ FlowManager fm = createPartialMockAndInvokeDefaultConstructor(FlowManager.class, getAllFlowsWithDataPathSummary);
+ FlowPath flowPath1 = createTestFlowPath(1, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 2, 3, 4);
+ FlowPath flowPath2 = createTestFlowPath(5, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 2, 3, 4, 5);
+ FlowPath flowPath3 = createTestFlowPath(10, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 3, 4, 5, 6);
// instantiate required objects
- ArrayList<IFlowPath> flows = new ArrayList<IFlowPath>();
+ ArrayList<FlowPath> flows = new ArrayList<FlowPath>();
flows.add(flowPath3);
flows.add(flowPath1);
flows.add(flowPath2);
// setup expectations
expectInitWithContext();
- expectPrivate(fm, getAllFlowsWithoutFlowEntries).andReturn(flows);
+ expectPrivate(fm, getAllFlowsWithDataPathSummary).andReturn(flows);
// start the test
replayAll();
fm.init(context);
- ArrayList<IFlowPath> returnedFlows = fm.getAllFlowsSummary(null, 0);
+ ArrayList<FlowPath> returnedFlows = fm.getAllFlowsSummary(null, 0);
// verify the test
verifyAll();
assertEquals(3, returnedFlows.size());
- assertEquals(1, new FlowId(returnedFlows.get(0).getFlowId()).value());
- assertEquals(5, new FlowId(returnedFlows.get(1).getFlowId()).value());
- assertEquals(10, new FlowId(returnedFlows.get(2).getFlowId()).value());
+ assertEquals(1, new FlowId(returnedFlows.get(0).flowId().value()).value());
+ assertEquals(5, new FlowId(returnedFlows.get(1).flowId().value()).value());
+ assertEquals(10, new FlowId(returnedFlows.get(2).flowId().value()).value());
}
/**
@@ -786,85 +785,6 @@
// other methods
-
- /**
- * Test method for {@link FlowManager#clearFlow(FlowId)}.
- * @throws Exception
- */
- @Test
- public final void testClearFlowSuccessNormally() throws Exception {
- // create mock objects
- IFlowPath flowPath = createIFlowPathMock(123, "id", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 2, 3, 4);
- IFlowEntry flowEntry1 = createMock(IFlowEntry.class);
- IFlowEntry flowEntry2 = createMock(IFlowEntry.class);
- IFlowEntry flowEntry3 = createMock(IFlowEntry.class);
-
- // instantiate required objects
- FlowManager fm = new FlowManager();
- FlowId flowId = new FlowId(123);
- ArrayList<IFlowEntry> flowEntries = new ArrayList<IFlowEntry>();
- flowEntries.add(flowEntry1);
- flowEntries.add(flowEntry2);
- flowEntries.add(flowEntry3);
-
- // setup expectations
- expectInitWithContext();
- expect(op.searchFlowPath(cmpEq(flowId))).andReturn(flowPath);
- expect(flowPath.getFlowEntries()).andReturn(flowEntries);
- flowPath.removeFlowEntry(flowEntry1);
- flowPath.removeFlowEntry(flowEntry2);
- flowPath.removeFlowEntry(flowEntry3);
- op.removeFlowEntry(flowEntry1);
- op.removeFlowEntry(flowEntry2);
- op.removeFlowEntry(flowEntry3);
- op.removeFlowPath(flowPath);
- op.commit();
-
- // start the test
- replayAll();
-
- fm.init(context);
- fm.clearFlow(flowId);
-
- // verify the test
- verifyAll();
- }
-
- /**
- * Test method for {@link FlowManager#getAllFlowsWithoutFlowEntries()}.
- * @throws Exception
- */
- @Test
- public final void testGetAllFlowsWithoutFlowEntriesSuccessNormally() throws Exception {
- // create mock objects
- IFlowPath iFlowPath1 = createIFlowPathMock(1, "caller id", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 1, 2, 2);
- IFlowPath iFlowPath2 = createIFlowPathMock(2, "caller id", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 2, 5, 3, 5);
-
- // instantiate required objects
- ArrayList<IFlowPath> flowPaths = new ArrayList<IFlowPath>();
- flowPaths.add(iFlowPath1);
- flowPaths.add(iFlowPath2);
- FlowManager fm = new FlowManager();
-
- // setup expectations
- expectInitWithContext();
- op.commit();
- expect(op.getAllFlowPaths()).andReturn(flowPaths);
-
- // start the test
- replayAll();
-
- fm.init(context);
- ArrayList<IFlowPath> result = fm.getAllFlowsWithoutFlowEntries();
-
- // verify the test
- verifyAll();
- assertEquals(iFlowPath1, result.get(0));
- assertEquals(iFlowPath2, result.get(1));
-
- // TODO: does this method just return the replica of the flow paths?
- }
-
/**
* Test method for {@link FlowManager#reconcileFlow(IFlowPath, DataPath)}.
* @throws Exception
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
new file mode 100644
index 0000000..4c66367
--- /dev/null
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
@@ -0,0 +1,562 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import static org.junit.Assert.*;
+import static org.powermock.api.easymock.PowerMock.createMock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.floodlightcontroller.util.OFMessageDamper;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.CallerId;
+import net.onrc.onos.ofcontroller.util.DataPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction;
+import net.onrc.onos.ofcontroller.util.FlowEntryActions;
+import net.onrc.onos.ofcontroller.util.FlowEntryErrorState;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
+import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.FlowPathFlags;
+import net.onrc.onos.ofcontroller.util.FlowPathType;
+import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Port;
+import net.onrc.onos.ofcontroller.util.SwitchPort;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.openflow.protocol.OFBarrierRequest;
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.factory.BasicFactory;
+
+public class FlowPusherTest {
+ private FlowPusher pusher;
+ private FloodlightContext context;
+ private FloodlightModuleContext modContext;
+ private BasicFactory factory;
+ private OFMessageDamper damper;
+ private IFloodlightProviderService flservice;
+ private IThreadPoolService tpservice;
+
+ /**
+ * Test single OFMessage is correctly sent to single switch via MessageDamper.
+ */
+ @Test
+ public void testAddMessage() {
+ beginInitMock();
+
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(sw);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context))).andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+
+ try {
+ // wait until message is processed.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed in Thread.sleep()");
+ }
+
+ EasyMock.verify(msg);
+ EasyMock.verify(sw);
+
+ pusher.stop();
+ }
+
+ /**
+ * Test bunch of OFMessages are correctly sent to single switch via MessageDamper.
+ */
+ @Test
+ public void testMassiveAddMessage() {
+ final int NUM_MSG = 10000;
+
+ beginInitMock();
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.replay(sw);
+
+ List<OFMessage> messages = new ArrayList<OFMessage>();
+
+ for (int i = 0; i < NUM_MSG; ++i) {
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(i).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+ messages.add(msg);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ for (OFMessage msg : messages) {
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+ }
+
+ try {
+ // wait until message is processed.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed in Thread.sleep()");
+ }
+
+ for (OFMessage msg : messages) {
+ EasyMock.verify(msg);
+ }
+ EasyMock.verify(sw);
+
+ pusher.stop();
+ }
+
+ /**
+ * Test bunch of OFMessages are correctly sent to multiple switches with single threads.
+ */
+ @Test
+ public void testMultiSwitchAddMessage() {
+ final int NUM_SWITCH = 10;
+ final int NUM_MSG = 100; // messages per thread
+
+ beginInitMock();
+
+ Map<IOFSwitch, List<OFMessage>> sw_map = new HashMap<IOFSwitch, List<OFMessage>>();
+ for (int i = 0; i < NUM_SWITCH; ++i) {
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)i).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.replay(sw);
+
+ List<OFMessage> messages = new ArrayList<OFMessage>();
+
+ for (int j = 0; j < NUM_MSG; ++j) {
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(j).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+ messages.add(msg);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+ }
+ sw_map.put(sw, messages);
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ for (IOFSwitch sw : sw_map.keySet()) {
+ for (OFMessage msg : sw_map.get(sw)) {
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+ }
+ }
+
+ try {
+ // wait until message is processed.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed in Thread.sleep()");
+ }
+
+ for (IOFSwitch sw : sw_map.keySet()) {
+ for (OFMessage msg : sw_map.get(sw)) {
+ EasyMock.verify(msg);
+ }
+
+ EasyMock.verify(sw);
+ }
+
+ pusher.stop();
+ }
+
+ /**
+ * Test bunch of OFMessages are correctly sent to multiple switches using multiple threads.
+ */
+ @Test
+ public void testMultiThreadedAddMessage() {
+ final int NUM_THREAD = 10;
+ final int NUM_MSG = 100; // messages per thread
+
+ beginInitMock();
+
+ Map<IOFSwitch, List<OFMessage>> sw_map = new HashMap<IOFSwitch, List<OFMessage>>();
+ for (int i = 0; i < NUM_THREAD; ++i) {
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)i).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.replay(sw);
+
+ List<OFMessage> messages = new ArrayList<OFMessage>();
+
+ for (int j = 0; j < NUM_MSG; ++j) {
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(j).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+ messages.add(msg);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+ }
+ sw_map.put(sw, messages);
+ }
+
+ endInitMock();
+ initPusher(NUM_THREAD);
+
+ for (IOFSwitch sw : sw_map.keySet()) {
+ for (OFMessage msg : sw_map.get(sw)) {
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+ }
+ }
+
+ try {
+ // wait until message is processed.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed in Thread.sleep()");
+ }
+
+ for (IOFSwitch sw : sw_map.keySet()) {
+ for (OFMessage msg : sw_map.get(sw)) {
+ EasyMock.verify(msg);
+ }
+
+ EasyMock.verify(sw);
+ }
+
+ pusher.stop();
+ }
+
+ private long barrierTime = 0;
+ /**
+ * Test rate limitation of messages works correctly.
+ */
+ @Test
+ public void testRateLimitedAddMessage() {
+ final long LIMIT_RATE = 100; // [bytes/ms]
+ final int NUM_MSG = 1000;
+
+ // Accuracy of FlowPusher's rate calculation can't be measured by unit test
+ // because switch doesn't return BARRIER_REPLY.
+ // In unit test we use approximate way to measure rate. This value is
+ // acceptable margin of measured rate.
+ final double ACCEPTABLE_RATE = LIMIT_RATE * 1.2;
+
+ beginInitMock();
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ prepareBarrier(sw);
+ EasyMock.replay(sw);
+
+ List<OFMessage> messages = new ArrayList<OFMessage>();
+
+ for (int i = 0; i < NUM_MSG; ++i) {
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.expect(msg.getLengthU()).andReturn(100).anyTimes();
+ EasyMock.replay(msg);
+ messages.add(msg);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+ }
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), (OFMessage)EasyMock.anyObject(), EasyMock.eq(context)))
+ .andAnswer(new IAnswer<Boolean>() {
+ @Override
+ public Boolean answer() throws Throwable {
+ OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
+ if (msg.getType() == OFType.BARRIER_REQUEST) {
+ barrierTime = System.currentTimeMillis();
+ }
+ return true;
+ }
+ }).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ pusher.createQueue(sw);
+ pusher.setRate(sw, LIMIT_RATE);
+
+ long beginTime = System.currentTimeMillis();
+ for (OFMessage msg : messages) {
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+ }
+
+ pusher.barrierAsync(sw);
+
+ try {
+ do {
+ Thread.sleep(1000);
+ } while (barrierTime == 0);
+ } catch (InterruptedException e) {
+ fail("Failed to sleep");
+ }
+
+ double measured_rate = NUM_MSG * 100 / (barrierTime - beginTime);
+ assertTrue(measured_rate < ACCEPTABLE_RATE);
+
+ for (OFMessage msg : messages) {
+ EasyMock.verify(msg);
+ }
+ EasyMock.verify(sw);
+
+ pusher.stop();
+ }
+
+ /**
+ * Test barrier message is correctly sent to a switch.
+ */
+ @Test
+ public void testBarrierMessage() {
+ beginInitMock();
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ prepareBarrier(sw);
+ EasyMock.replay(sw);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), (OFMessage)EasyMock.anyObject(), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ OFBarrierReplyFuture future = pusher.barrierAsync(sw);
+
+ assertNotNull(future);
+ pusher.stop();
+ }
+
+ /**
+ * Test FlowObject is correctly converted to message and is sent to a switch.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAddFlow() {
+ // Code below are copied from FlowManagerTest
+
+ // instantiate required objects
+ FlowEntry flowEntry1 = new FlowEntry();
+ flowEntry1.setDpid(new Dpid(1));
+ flowEntry1.setFlowId(new FlowId(1));
+ flowEntry1.setInPort(new Port((short) 1));
+ flowEntry1.setOutPort(new Port((short) 11));
+ flowEntry1.setFlowEntryId(new FlowEntryId(1));
+ flowEntry1.setFlowEntryMatch(new FlowEntryMatch());
+ flowEntry1.setFlowEntryActions(new FlowEntryActions());
+ flowEntry1.setFlowEntryErrorState(new FlowEntryErrorState());
+ flowEntry1.setFlowEntryUserState(FlowEntryUserState.FE_USER_ADD);
+
+ beginInitMock();
+
+ OFFlowMod msg = EasyMock.createMock(OFFlowMod.class);
+ EasyMock.expect(msg.setIdleTimeout(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setHardTimeout(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setPriority(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setBufferId(EasyMock.anyInt())).andReturn(msg);
+ EasyMock.expect(msg.setCookie(EasyMock.anyLong())).andReturn(msg);
+ EasyMock.expect(msg.setCommand(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setMatch(EasyMock.anyObject(OFMatch.class))).andReturn(msg);
+ EasyMock.expect(msg.setActions((List<OFAction>)EasyMock.anyObject())).andReturn(msg);
+ EasyMock.expect(msg.setLengthU(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setOutPort(EasyMock.anyShort())).andReturn(msg).atLeastOnce();
+ EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
+ EasyMock.expect(msg.getType()).andReturn(OFType.FLOW_MOD).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+
+ EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.FLOW_MOD))).andReturn(msg);
+
+ ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
+ EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
+ (TimeUnit)EasyMock.anyObject())).andReturn(null).once();
+ EasyMock.replay(executor);
+ EasyMock.expect(tpservice.getScheduledExecutor()).andReturn(executor);
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ EasyMock.expect(sw.getStringId()).andReturn("1").anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(sw);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.anyObject(OFMessage.class), EasyMock.eq(context)))
+ .andAnswer(new IAnswer<Boolean>() {
+ @Override
+ public Boolean answer() throws Throwable {
+ OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
+ assertEquals(msg.getType(), OFType.FLOW_MOD);
+ return true;
+ }
+ }).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ pusher.add(sw, flowEntry1);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed to sleep");
+ }
+
+ EasyMock.verify(sw);
+
+ pusher.stop();
+ }
+
+ private void beginInitMock() {
+ context = EasyMock.createMock(FloodlightContext.class);
+ modContext = EasyMock.createMock(FloodlightModuleContext.class);
+ factory = EasyMock.createMock(BasicFactory.class);
+ damper = EasyMock.createMock(OFMessageDamper.class);
+ flservice = EasyMock.createMock(IFloodlightProviderService.class);
+ tpservice = EasyMock.createMock(IThreadPoolService.class);
+
+ EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IThreadPoolService.class)))
+ .andReturn(tpservice).once();
+ EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IFloodlightProviderService.class)))
+ .andReturn(flservice).once();
+ flservice.addOFMessageListener(EasyMock.eq(OFType.BARRIER_REPLY),
+ (FlowPusher) EasyMock.anyObject());
+ EasyMock.expectLastCall().once();
+ }
+
+ private void endInitMock() {
+ EasyMock.replay(tpservice);
+ EasyMock.replay(flservice);
+ EasyMock.replay(damper);
+ EasyMock.replay(factory);
+ EasyMock.replay(modContext);
+ EasyMock.replay(context);
+ }
+
+ private void initPusher(int num_thread) {
+ pusher = new FlowPusher(num_thread);
+ pusher.init(context, modContext, factory, damper);
+ pusher.start();
+ }
+
+ private void prepareBarrier(IOFSwitch sw) {
+ OFBarrierRequest req = EasyMock.createMock(OFBarrierRequest.class);
+ req.setXid(EasyMock.anyInt());
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(req.getXid()).andReturn(1).anyTimes();
+ EasyMock.expect(req.getType()).andReturn(OFType.BARRIER_REQUEST).anyTimes();
+ EasyMock.expect(req.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(req);
+ EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.BARRIER_REQUEST))).andReturn(req);
+
+ ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
+ EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
+ (TimeUnit)EasyMock.anyObject())).andReturn(null).once();
+ EasyMock.replay(executor);
+ EasyMock.expect(tpservice.getScheduledExecutor()).andReturn(executor);
+
+ EasyMock.expect(sw.getNextTransactionId()).andReturn(1);
+ }
+
+ // Copied from FlowManagerTest
+ private IFlowPath createIFlowPathMock(long flowId, String installerID,
+ String flowPathType, String flowPathUserState,
+ long flowPathFlags, long srcDpid, int srcPort,
+ long dstDpid, int dstPort) {
+ IFlowPath iFlowPath = EasyMock.createNiceMock(IFlowPath.class);
+ EasyMock.expect(iFlowPath.getFlowId()).andReturn(new FlowId(flowId).toString()).anyTimes();
+ EasyMock.expect(iFlowPath.getInstallerId()).andReturn(installerID).anyTimes();
+ EasyMock.expect(iFlowPath.getFlowPathType()).andReturn(flowPathType).anyTimes();
+ EasyMock.expect(iFlowPath.getFlowPathUserState()).andReturn(flowPathUserState).anyTimes();
+ EasyMock.expect(iFlowPath.getFlowPathFlags()).andReturn(new Long(flowPathFlags)).anyTimes();
+ EasyMock.expect(iFlowPath.getSrcSwitch()).andReturn(new Dpid(srcDpid).toString()).anyTimes();
+ EasyMock.expect(iFlowPath.getSrcPort()).andReturn(new Short((short)srcPort)).anyTimes();
+ EasyMock.expect(iFlowPath.getDstSwitch()).andReturn(new Dpid(dstDpid).toString()).anyTimes();
+ EasyMock.expect(iFlowPath.getDstPort()).andReturn(new Short((short)dstPort)).anyTimes();
+ return iFlowPath;
+ }
+
+}
diff --git a/web/clear_flow.py b/web/clear_flow.py
deleted file mode 100755
index db70d40..0000000
--- a/web/clear_flow.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#! /usr/bin/env python
-# -*- Mode: python; py-indent-offset: 4; tab-width: 8; indent-tabs-mode: t; -*-
-
-import pprint
-import os
-import sys
-import subprocess
-import json
-import argparse
-import io
-import time
-
-from flask import Flask, json, Response, render_template, make_response, request
-
-#
-# TODO: remove this! We don't use JSON argument here!
-# curl http://127.0.0.1:8080/wm/flow/clear/{"value":"0xf"}/json'
-#
-
-## Global Var ##
-ControllerIP="127.0.0.1"
-ControllerPort=8080
-
-DEBUG=0
-pp = pprint.PrettyPrinter(indent=4)
-
-app = Flask(__name__)
-
-## Worker Functions ##
-def log_error(txt):
- print '%s' % (txt)
-
-def debug(txt):
- if DEBUG:
- print '%s' % (txt)
-
-# @app.route("/wm/flow/clear/<flow-id>/json")
-def clear_flow_path(flow_id):
- command = "curl -s \"http://%s:%s/wm/flow/clear/%s/json\"" % (ControllerIP, ControllerPort, flow_id)
- debug("clear_flow_path %s" % command)
- result = os.popen(command).read()
- debug("result %s" % result)
- # parsedResult = json.loads(result)
- # debug("parsed %s" % parsedResult)
-
-if __name__ == "__main__":
- usage_msg = "Clear flow state from the ONOS Network Map\n"
- usage_msg = usage_msg + "Usage: %s <begin-flow-id> <end-flow-id>\n" % (sys.argv[0])
- usage_msg = usage_msg + " %s <flow-id>\n" % (sys.argv[0])
- usage_msg = usage_msg + "\n"
- usage_msg = usage_msg + " Arguments:\n"
- usage_msg = usage_msg + " <begin-flow-id> <end-flow-id> Clear all flows in the flow ID range\n"
- usage_msg = usage_msg + " <flow-id> Clear a single flow with the flow ID\n"
- usage_msg = usage_msg + " all Clear all flows\n"
-
- # app.debug = False;
-
- # Usage info
- if len(sys.argv) > 1 and (sys.argv[1] == "-h" or sys.argv[1] == "--help"):
- print(usage_msg)
- exit(0)
-
- # Check arguments
- if len(sys.argv) < 2:
- log_error(usage_msg)
- exit(1)
-
- if (sys.argv[1] == "all"):
- clear_flow_path(sys.argv[1])
- else:
- begin_flow_id = int(sys.argv[1], 0)
- if len(sys.argv) >= 3:
- end_flow_id = int(sys.argv[2], 0)
- else:
- end_flow_id = begin_flow_id
-
- # Do the work
- flow_id = begin_flow_id
- while flow_id <= end_flow_id:
- clear_flow_path(flow_id)
- flow_id = flow_id + 1
diff --git a/web/ons-demo/js/app.js b/web/ons-demo/js/app.js
index 94c41e2..d869de7 100644
--- a/web/ons-demo/js/app.js
+++ b/web/ons-demo/js/app.js
@@ -1,6 +1,17 @@
/*global d3, document∆*/
+function updateFlow(model) {
+ model.flows.forEach(function (flow) {
+ flow.flowId = flow.flowId.value;
+ flow.installerId = flow.installerId.value;
+ flow.dstDpid = flow.dataPath.dstPort.dpid.value;
+ flow.srcDpid = flow.dataPath.srcPort.dpid.value;
+ flow.dstPort = flow.dataPath.dstPort.port.value;
+ flow.srcPort = flow.dataPath.srcPort.port.value;
+ });
+}
+
function sync() {
var d = Date.now();
@@ -8,6 +19,7 @@
// console.log('Update time: ' + (Date.now() - d)/1000 + 's');
if (newModel) {
+ updateFlow(newModel);
var modelChanged = false;
var newModelString = JSON.stringify(newModel);
if (!modelString || newModelString != modelString) {
diff --git a/web/pusher.py b/web/pusher.py
new file mode 100755
index 0000000..2a3528b
--- /dev/null
+++ b/web/pusher.py
@@ -0,0 +1,152 @@
+#! /usr/bin/env python
+
+
+import pprint
+import os
+import sys
+import subprocess
+import json
+import argparse
+import io
+import time
+
+from flask import Flask, json, Response, render_template, make_response, request
+
+## Global Var ##
+ControllerIP="127.0.0.1"
+ControllerPort=8080
+
+DEBUG=0
+pp = pprint.PrettyPrinter(indent=4)
+
+app = Flask(__name__)
+
+## Worker Functions ##
+def log_error(txt):
+ print '%s' % (txt)
+
+def debug(txt):
+ if DEBUG:
+ print '%s' % (txt)
+
+# @app.route("/wm/fprog/pusher/setrate/<dpid>/<rate>/json")
+# Sample output:
+# "true"
+def set_rate(dpid,rate):
+ try:
+ command = "curl -s \"http://%s:%s/wm/fprog/pusher/setrate/%s/%s/json\"" % (ControllerIP, ControllerPort, dpid, rate)
+ debug("set_rate %s" % command)
+
+ result = os.popen(command).read()
+ debug("result %s" % result)
+ if result == "false":
+ print "Failed to set rate"
+ return;
+ except:
+ log_error("Controller IF has issue")
+ exit(1)
+
+ print "Sending rate to %s is successfully set to %s" % (dpid, rate)
+
+# @app.route("/wm/fprog/pusher/suspend/<dpid>/json")
+# Sample output:
+# "true"
+def suspend(dpid):
+ try:
+ command = "curl -s \"http://%s:%s/wm/fprog/pusher/suspend/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+ debug("suspend %s" % command)
+
+ result = os.popen(command).read()
+ debug("result %s" % result)
+ if result == "false":
+ print "Failed to suspend"
+ return;
+ except:
+ log_error("Controller IF has issue")
+ exit(1)
+
+ print "DPID %s is successfully suspended" % dpid
+
+# @app.route("/wm/fprog/pusher/resume/<dpid>/json")
+# Sample output:
+# "true"
+def resume(dpid):
+ try:
+ command = "curl -s \"http://%s:%s/wm/fprog/pusher/resume/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+ debug("resume %s" % command)
+
+ result = os.popen(command).read()
+ debug("result %s" % result)
+ if result == "false":
+ print "Failed to resume"
+ return;
+ except:
+ log_error("Controller IF has issue")
+ exit(1)
+
+ print "DPID %s is successfully resumed" % dpid
+
+# @app.route("/wm/fprog/pusher/barrier/<dpid>/json")
+# Sample output:
+# "{"version":1,"type":"BARRIER_REPLY","length":8,"xid":4,"lengthU":8}"
+def barrier(dpid):
+ try:
+ command = "curl -s \"http://%s:%s/wm/fprog/pusher/barrier/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+ debug("barrier %s" % command)
+
+ result = os.popen(command).read()
+ debug("result %s" % result)
+ if result == "false":
+ print "Failed to send barrier"
+ return;
+ except:
+ log_error("Controller IF has issue")
+ exit(1)
+
+ print "Barrier reply from %s : %s" % (dpid, result)
+
+
+if __name__ == "__main__":
+ usage_msg1 = "Usage:\n"
+ usage_msg2 = "%s rate <dpid> <rate> : Set sending rate[bytes/ms] to the switch\n" % (sys.argv[0])
+ usage_msg3 = " suspend <dpid> : Suspend sending message to the switch\n"
+ usage_msg4 = " resume <dpid> : Resume sending message to the switch\n"
+ usage_msg5 = " barrier <dpid> : Send barrier message to the switch\n"
+ usage_msg = usage_msg1 + usage_msg2 + usage_msg3 + usage_msg4 + usage_msg5;
+
+ app.debug = True;
+
+ # Usage info
+ if len(sys.argv) > 1 and (sys.argv[1] == "-h" or sys.argv[1] == "--help"):
+ print(usage_msg)
+ exit(0)
+
+ # Check arguments
+ if len(sys.argv) < 2:
+ log_error(usage_msg)
+ exit(1)
+
+ # Do the work
+ if sys.argv[1] == "rate":
+ if len(sys.argv) < 4:
+ log_error(usage_msg)
+ exit(1)
+ set_rate(sys.argv[2], sys.argv[3])
+ elif sys.argv[1] == "suspend":
+ if len(sys.argv) < 3:
+ log_error(usage_msg)
+ exit(1)
+ suspend(sys.argv[2])
+ elif sys.argv[1] == "resume":
+ if len(sys.argv) < 3:
+ log_error(usage_msg)
+ exit(1)
+ resume(sys.argv[2])
+ elif sys.argv[1] == "barrier":
+ if len(sys.argv) < 3:
+ log_error(usage_msg)
+ exit(1)
+ barrier(sys.argv[2])
+ else:
+ log_error(usage_msg)
+ exit(1)
diff --git a/web/topology_rest.py b/web/topology_rest.py
index bac3113..b3a415e 100755
--- a/web/topology_rest.py
+++ b/web/topology_rest.py
@@ -720,8 +720,8 @@
# print "Debug: Controller command %s called %s" % (cmd, controller_name)
else:
# No longer use -i to specify keys (use .ssh/config to specify it)
- start_onos="ssh %s ONOS/start-onos.sh start" % (controller_name)
- stop_onos="ssh %s ONOS/start-onos.sh stop" % (controller_name)
+ start_onos="ssh %s \"cd ONOS; ./start-onos.sh start\"" % (controller_name)
+ stop_onos="ssh %s \"cd ONOS; ./start-onos.sh stop\"" % (controller_name)
# start_onos="ssh -i ~/.ssh/onlabkey.pem %s ONOS/start-onos.sh start" % (controller_name)
# stop_onos="ssh -i ~/.ssh/onlabkey.pem %s ONOS/start-onos.sh stop" % (controller_name)
@@ -960,7 +960,7 @@
parsedResult = json.loads(result)
if len(parsedResult) > 0:
if parsedResult[-1].has_key('flowId'):
- flow_nr = int(parsedResult[-1]['flowId'], 16)
+ flow_nr = int(parsedResult[-1]['flowId']['value'], 16)
else:
flow_nr = -1 # first flow
print "first flow"