Merge from master
diff --git a/README.md b/README.md
index b71b9aa..9f2c040 100644
--- a/README.md
+++ b/README.md
@@ -15,12 +15,12 @@
0. Install custom jars and dependencies (needs to be run only once)
- $ ./setup-local-maven.sh
+ $ ./setup-local-maven.sh
1. Cleanly build ONOS
- $ mvn clean
- $ mvn compile
+ $ mvn clean
+ $ mvn compile
NOTE: installing maven for the first time may switch java version
from 1.7 to 1.6. This might prevent Cassandra to run.
@@ -28,17 +28,19 @@
Dependencies
------------
1. Zookeeper
+
Download and install apache-zookeeper-3.4.5:
http://zookeeper.apache.org/releases.html
- Edit file (ONOS-INSTALL-DIR)/start-zk.sh and set variable "ZK_DIR"
+ Edit file `(ONOS-INSTALL-DIR)/start-zk.sh` and set variable "ZK_DIR"
to point to the Zookeeper directory.
2. Cassandra
+
Download and install apache-cassandra-1.2.4:
http://cassandra.apache.org/download/
- Edit file (ONOS-INSTALL-DIR)/start-cassandra.sh and set variable
+ Edit file `(ONOS-INSTALL-DIR)/start-cassandra.sh` and set variable
"CASSANDRA_DIR" to point to the Cassandra directory.
Running ONOS with Cassandra as a separate process
@@ -47,62 +49,62 @@
1. Start Zookeeper
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-zk.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-zk.sh start
- ## Confirm Zookeeper is running:
- $ ./start.zk.sh status
+ ## Confirm Zookeeper is running:
+ $ ./start.zk.sh status
2. Start Cassandra
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-cassandra.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-cassandra.sh start
- ## Confirm Cassandra is running:
- $ ./start-cassandra.sh status
+ ## Confirm Cassandra is running:
+ $ ./start-cassandra.sh status
3. Start ONOS
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-onos.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-onos.sh start
- ## Confirm ONOS is running:
- $ ./start-onos.sh status
-
+ ## Confirm ONOS is running:
+ $ ./start-onos.sh status
+
4. Start ONOS REST API server
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-rest.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-rest.sh start
- ## Confirm the REST API server is running:
- $ ./start-rest.sh status
+ ## Confirm the REST API server is running:
+ $ ./start-rest.sh status
Running ONOS with Cassandra embedded (Optional)
-----------------------------------------------
1. Start Zookeeper
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-zk.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-zk.sh start
- ## Confirm Zookeeper is running:
- $ ./start.zk.sh status
-
+ ## Confirm Zookeeper is running:
+ $ ./start.zk.sh status
+
2. Start ONOS and Cassandra embedded
- $ cd (ONOS-INSTALL_DIR)/
- $ ./start-onos-embedded.sh start
+ $ cd (ONOS-INSTALL_DIR)/
+ $ ./start-onos-embedded.sh start
- ## Confirm ONOS is running:
- $ ./start-onos-embedded.sh status
-
+ ## Confirm ONOS is running:
+ $ ./start-onos-embedded.sh status
+
3. Start ONOS REST API server
- $ cd (ONOS-INSTALL-DIR)/
- $ ./start-rest.sh start
+ $ cd (ONOS-INSTALL-DIR)/
+ $ ./start-rest.sh start
- ## Confirm the REST API server is running:
- $ ./start-rest.sh status
+ ## Confirm the REST API server is running:
+ $ ./start-rest.sh status
Running in offline mode (Optional)
@@ -110,9 +112,9 @@
Maven is used to build and run ONOS. By default, maven tries to reach
the repositories. The '-o' option can be given to the 'mvn' command to
-suppress this behavior. The MVN environmental variable can be used to
+suppress this behavior. The `MVN` environmental variable can be used to
set additional options to the 'mvn' command used in ONOS.
* Example: Running in offline mode
- $ env MVN="mvn -o" ./start-onos.sh start
+ $ env MVN="mvn -o" ./start-onos.sh start
diff --git a/src/main/java/net/floodlightcontroller/core/FloodlightProvider.java b/src/main/java/net/floodlightcontroller/core/FloodlightProvider.java
index 95daf96..4d85b7d 100644
--- a/src/main/java/net/floodlightcontroller/core/FloodlightProvider.java
+++ b/src/main/java/net/floodlightcontroller/core/FloodlightProvider.java
@@ -15,6 +15,7 @@
import net.floodlightcontroller.restserver.IRestApiService;
import net.floodlightcontroller.storage.IStorageSourceService;
import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryService;
import net.onrc.onos.registry.controller.IControllerRegistryService;
public class FloodlightProvider implements IFloodlightModule {
@@ -52,6 +53,7 @@
dependencies.add(IThreadPoolService.class);
// Following added by ONOS
dependencies.add(IControllerRegistryService.class);
+ dependencies.add(ILinkDiscoveryService.class);
return dependencies;
}
@@ -71,6 +73,8 @@
// Following added by ONOS
controller.setMastershipService(
context.getServiceImpl(IControllerRegistryService.class));
+ controller.setLinkDiscoveryService(
+ context.getServiceImpl(ILinkDiscoveryService.class));
controller.init(context.getConfigParams(this));
}
diff --git a/src/main/java/net/floodlightcontroller/core/internal/Controller.java b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
index 17f1be8..31f80cc 100644
--- a/src/main/java/net/floodlightcontroller/core/internal/Controller.java
+++ b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
@@ -75,6 +75,7 @@
import net.floodlightcontroller.storage.StorageException;
import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
+import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryService;
import net.onrc.onos.registry.controller.IControllerRegistryService;
import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
import net.onrc.onos.registry.controller.RegistryException;
@@ -189,6 +190,8 @@
protected IThreadPoolService threadPool;
protected IControllerRegistryService registryService;
+ protected ILinkDiscoveryService linkDiscovery;
+
// Configuration options
protected int openFlowPort = 6633;
protected int workerThreads = 0;
@@ -407,6 +410,10 @@
this.registryService = serviceImpl;
}
+ public void setLinkDiscoveryService(ILinkDiscoveryService linkDiscovery) {
+ this.linkDiscovery = linkDiscovery;
+ }
+
@Override
public Role getRole() {
synchronized(roleChanger) {
@@ -1298,6 +1305,12 @@
updatePortInfo(sw, port);
log.debug("Port #{} modified for {}", portNumber, sw);
} else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
+ // XXX Workaround to prevent race condition where a link is detected
+ // and attempted to be written to the database before the port is in
+ // the database. We now suppress link discovery on ports until we're
+ // sure they're in the database.
+ linkDiscovery.AddToSuppressLLDPs(sw.getId(), port.getPortNumber());
+
sw.setPort(port);
SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
try {
@@ -1541,6 +1554,14 @@
"network problem that can be ignored."
)
protected void addSwitch(IOFSwitch sw) {
+ // XXX Workaround to prevent race condition where a link is detected
+ // and attempted to be written to the database before the port is in
+ // the database. We now suppress link discovery on ports until we're
+ // sure they're in the database.
+ for (OFPhysicalPort port : sw.getPorts()) {
+ linkDiscovery.AddToSuppressLLDPs(sw.getId(), port.getPortNumber());
+ }
+
// TODO: is it safe to modify the HashMap without holding
// the old switch's lock?
OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/INetMapTopologyObjects.java b/src/main/java/net/onrc/onos/ofcontroller/core/INetMapTopologyObjects.java
index 869333b..49ffd4e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/INetMapTopologyObjects.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/INetMapTopologyObjects.java
@@ -117,6 +117,7 @@
@Adjacency(label="host")
public void removeDevice(final IDeviceObject device);
+
@JsonIgnore
@Adjacency(label="inport",direction = Direction.IN)
public Iterable<IFlowEntry> getInFlowEntries();
@@ -125,6 +126,7 @@
@Adjacency(label="outport",direction = Direction.IN)
public Iterable<IFlowEntry> getOutFlowEntries();
+
@JsonIgnore
@Adjacency(label="link")
public Iterable<IPortObject> getLinkedPorts();
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
index 6ee3c46..182a20a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
@@ -77,27 +77,34 @@
@Override
public IDeviceObject addDevice(IDevice device) {
IDeviceObject obj = null;
- try {
- if ((obj = ope.searchDevice(device.getMACAddressString())) != null) {
- log.debug("Adding device {}: found existing device", device.getMACAddressString());
- } else {
- obj = ope.newDevice();
- log.debug("Adding device {}: creating new device", device.getMACAddressString());
+ for (int i = 0; i < 6; i++) {
+ try {
+ if (i > 0) {
+ log.debug("Retrying add device: i is {}", i);
+ }
+ if ((obj = ope.searchDevice(device.getMACAddressString())) != null) {
+ log.debug("Adding device {}: found existing device", device.getMACAddressString());
+ } else {
+ obj = ope.newDevice();
+ log.debug("Adding device {}: creating new device", device.getMACAddressString());
+ }
+
+ changeDeviceAttachments(device, obj);
+
+ changeDeviceIpv4Addresses(device, obj);
+
+ obj.setMACAddress(device.getMACAddressString());
+ obj.setType("device");
+ obj.setState("ACTIVE");
+ ope.commit();
+
+ break;
+ //log.debug("Adding device {}",device.getMACAddressString());
+ } catch (TitanException e) {
+ ope.rollback();
+ log.error("Adding device {} failed", device.getMACAddressString(), e);
+ obj = null;
}
- changeDeviceAttachments(device, obj);
-
- changeDeviceIpv4Addresses(device, obj);
-
- obj.setMACAddress(device.getMACAddressString());
- obj.setType("device");
- obj.setState("ACTIVE");
- ope.commit();
-
- //log.debug("Adding device {}",device.getMACAddressString());
- } catch (TitanException e) {
- ope.rollback();
- log.error("Adding device {} failed", device.getMACAddressString(), e);
- obj = null;
}
return obj;
@@ -274,8 +281,10 @@
private void changeDeviceIpv4Addresses(IDevice device, IDeviceObject deviceObject) {
List<String> dbIpv4Addresses = new ArrayList<String>();
+ List<Integer> intDbIpv4Addresses = new ArrayList<Integer>();
for (IIpv4Address ipv4Vertex : deviceObject.getIpv4Addresses()) {
dbIpv4Addresses.add(InetAddresses.fromInteger(ipv4Vertex.getIpv4Address()).getHostAddress());
+ intDbIpv4Addresses.add(ipv4Vertex.getIpv4Address());
}
List<String> memIpv4Addresses = new ArrayList<String>();
@@ -287,13 +296,16 @@
memIpv4Addresses, dbIpv4Addresses);
for (int ipv4Address : device.getIPv4Addresses()) {
- if (deviceObject.getIpv4Address(ipv4Address) == null) {
+ //if (deviceObject.getIpv4Address(ipv4Address) == null) {
+ if (!intDbIpv4Addresses.contains(ipv4Address)) {
IIpv4Address dbIpv4Address = ope.ensureIpv4Address(ipv4Address);
+ /*
IDeviceObject oldDevice = dbIpv4Address.getDevice();
if (oldDevice != null) {
oldDevice.removeIpv4Address(dbIpv4Address);
}
+ */
log.debug("Adding IP address {}",
InetAddresses.fromInteger(ipv4Address).getHostAddress());
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/web/TopoSwitchesResource.java b/src/main/java/net/onrc/onos/ofcontroller/core/web/TopoSwitchesResource.java
index a173a70..6d3f161 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/web/TopoSwitchesResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/web/TopoSwitchesResource.java
@@ -17,12 +17,12 @@
String filter = (String) getRequestAttributes().get("filter");
if (filter.equals("active")) {
- return (Iterator<ISwitchObject>) impl.getActiveSwitches().iterator();
+ return impl.getActiveSwitches().iterator();
}
if (filter.equals("inactive")) {
- return (Iterator<ISwitchObject>) impl.getInactiveSwitches().iterator();
+ return impl.getInactiveSwitches().iterator();
} else {
- return (Iterator<ISwitchObject>) impl.getAllSwitches().iterator();
+ return impl.getAllSwitches().iterator();
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
index c1dbe54..c94acd5 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
@@ -248,10 +248,13 @@
// Publish: add the ports
// TODO: Add only ports that are UP?
for (OFPhysicalPort port : sw.getPorts()) {
- TopologyElement topologyElementPort =
- new TopologyElement(sw.getId(),
- port.getPortNumber());
- datagridService.notificationSendTopologyElementAdded(topologyElementPort);
+ TopologyElement topologyElementPort =
+ new TopologyElement(sw.getId(), port.getPortNumber());
+ datagridService.notificationSendTopologyElementAdded(topologyElementPort);
+
+ // Allow links to be discovered on this port now that it's
+ // in the database
+ linkDiscovery.RemoveFromSuppressLLDPs(sw.getId(), port.getPortNumber());
}
// Add all links that might be connected already
@@ -320,6 +323,10 @@
@Override
public void switchPortAdded(Long switchId, OFPhysicalPort port) {
if (swStore.addPort(HexString.toHexString(switchId), port)) {
+ // Allow links to be discovered on this port now that it's
+ // in the database
+ linkDiscovery.RemoveFromSuppressLLDPs(switchId, port.getPortNumber());
+
// TODO publish ADD_PORT event here
TopologyElement topologyElement =
new TopologyElement(switchId, port.getPortNumber());
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index 1bea72d..da40bef 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -3,7 +3,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
@@ -24,21 +23,17 @@
/**
* Class for performing Flow-related operations on the Database.
*/
-class FlowDatabaseOperation {
+public class FlowDatabaseOperation {
private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
/**
* Add a flow.
*
- * @param flowManager the Flow Manager to use.
* @param dbHandler the Graph Database handler to use.
* @param flowPath the Flow Path to install.
- * @param flowId the return-by-reference Flow ID as assigned internally.
* @return true on success, otherwise false.
*/
- static boolean addFlow(FlowManager flowManager,
- DBOperation dbHandler,
- FlowPath flowPath, FlowId flowId) {
+ static boolean addFlow(DBOperation dbHandler, FlowPath flowPath) {
IFlowPath flowObj = null;
boolean found = false;
try {
@@ -68,6 +63,21 @@
}
//
+ // Remove the old Flow Entries
+ //
+ if (found) {
+ Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
+ LinkedList<IFlowEntry> deleteFlowEntries =
+ new LinkedList<IFlowEntry>();
+ for (IFlowEntry flowEntryObj : flowEntries)
+ deleteFlowEntries.add(flowEntryObj);
+ for (IFlowEntry flowEntryObj : deleteFlowEntries) {
+ flowObj.removeFlowEntry(flowEntryObj);
+ dbHandler.removeFlowEntry(flowEntryObj);
+ }
+ }
+
+ //
// Set the Flow key:
// - flowId
//
@@ -155,45 +165,33 @@
// flowPath.dataPath().flowEntries()
//
for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
- if (addFlowEntry(flowManager, dbHandler, flowObj, flowEntry) == null) {
+ if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE)
+ continue; // Skip: all Flow Entries were deleted earlier
+
+ if (addFlowEntry(dbHandler, flowObj, flowEntry) == null) {
dbHandler.rollback();
return false;
}
}
dbHandler.commit();
- //
- // TODO: We need a proper Flow ID allocation mechanism.
- //
- flowId.setValue(flowPath.flowId().value());
-
return true;
}
/**
* Add a flow entry to the Network MAP.
*
- * @param flowManager the Flow Manager to use.
* @param dbHandler the Graph Database handler to use.
* @param flowObj the corresponding Flow Path object for the Flow Entry.
* @param flowEntry the Flow Entry to install.
* @return the added Flow Entry object on success, otherwise null.
*/
- static IFlowEntry addFlowEntry(FlowManager flowManager,
- DBOperation dbHandler,
+ static IFlowEntry addFlowEntry(DBOperation dbHandler,
IFlowPath flowObj,
FlowEntry flowEntry) {
// Flow edges
// HeadFE (TODO)
- //
- // Assign the FlowEntry ID.
- //
- if (! flowEntry.isValidFlowEntryId()) {
- long id = flowManager.getNextFlowEntryId();
- flowEntry.setFlowEntryId(new FlowEntryId(id));
- }
-
IFlowEntry flowEntryObj = null;
boolean found = false;
try {
@@ -497,137 +495,6 @@
}
/**
- * Get all previously added flows by a specific installer for a given
- * data path endpoints.
- *
- * @param dbHandler the Graph Database handler to use.
- * @param installerId the Caller ID of the installer of the flow to get.
- * @param dataPathEndpoints the data path endpoints of the flow to get.
- * @return the Flow Paths if found, otherwise null.
- */
- static ArrayList<FlowPath> getAllFlows(DBOperation dbHandler,
- CallerId installerId,
- DataPathEndpoints dataPathEndpoints) {
- //
- // TODO: The implementation below is not optimal:
- // We fetch all flows, and then return only the subset that match
- // the query conditions.
- // We should use the appropriate Titan/Gremlin query to filter-out
- // the flows as appropriate.
- //
- ArrayList<FlowPath> allFlows = getAllFlows(dbHandler);
- ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
-
- if (allFlows == null)
- return flowPaths;
-
- for (FlowPath flow : allFlows) {
- //
- // TODO: String-based comparison is sub-optimal.
- // We are using it for now to save us the extra work of
- // implementing the "equals()" and "hashCode()" methods.
- //
- if (! flow.installerId().toString().equals(installerId.toString()))
- continue;
- if (! flow.dataPath().srcPort().toString().equals(dataPathEndpoints.srcPort().toString())) {
- continue;
- }
- if (! flow.dataPath().dstPort().toString().equals(dataPathEndpoints.dstPort().toString())) {
- continue;
- }
- flowPaths.add(flow);
- }
-
- return flowPaths;
- }
-
- /**
- * Get all installed flows by all installers for given data path endpoints.
- *
- * @param dbHandler the Graph Database handler to use.
- * @param dataPathEndpoints the data path endpoints of the flows to get.
- * @return the Flow Paths if found, otherwise null.
- */
- static ArrayList<FlowPath> getAllFlows(DBOperation dbHandler,
- DataPathEndpoints dataPathEndpoints) {
- //
- // TODO: The implementation below is not optimal:
- // We fetch all flows, and then return only the subset that match
- // the query conditions.
- // We should use the appropriate Titan/Gremlin query to filter-out
- // the flows as appropriate.
- //
- ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
- ArrayList<FlowPath> allFlows = getAllFlows(dbHandler);
-
- if (allFlows == null)
- return flowPaths;
-
- for (FlowPath flow : allFlows) {
- //
- // TODO: String-based comparison is sub-optimal.
- // We are using it for now to save us the extra work of
- // implementing the "equals()" and "hashCode()" methods.
- //
- if (! flow.dataPath().srcPort().toString().equals(dataPathEndpoints.srcPort().toString())) {
- continue;
- }
- if (! flow.dataPath().dstPort().toString().equals(dataPathEndpoints.dstPort().toString())) {
- continue;
- }
- flowPaths.add(flow);
- }
-
- return flowPaths;
- }
-
- /**
- * Get summary of all installed flows by all installers in a given range.
- *
- * @param dbHandler the Graph Database handler to use.
- * @param flowId the Flow ID of the first flow in the flow range to get.
- * @param maxFlows the maximum number of flows to be returned.
- * @return the Flow Paths if found, otherwise null.
- */
- static ArrayList<FlowPath> getAllFlowsSummary(DBOperation dbHandler,
- FlowId flowId,
- int maxFlows) {
- //
- // TODO: The implementation below is not optimal:
- // We fetch all flows, and then return only the subset that match
- // the query conditions.
- // We should use the appropriate Titan/Gremlin query to filter-out
- // the flows as appropriate.
- //
- ArrayList<FlowPath> flowPaths = getAllFlowsWithDataPathSummary(dbHandler);
- Collections.sort(flowPaths);
- return flowPaths;
- }
-
- /**
- * Get all Flows information, with Data Path summary for the Flow Entries.
- *
- * @param dbHandler the Graph Database handler to use.
- * @return all Flows information, with Data Path summary for the Flow
- * Entries.
- */
- static ArrayList<FlowPath> getAllFlowsWithDataPathSummary(DBOperation dbHandler) {
- ArrayList<FlowPath> flowPaths = getAllFlows(dbHandler);
-
- // Truncate each Flow Path and Flow Entry
- for (FlowPath flowPath : flowPaths) {
- flowPath.setFlowEntryMatch(null);
- flowPath.setFlowEntryActions(null);
- for (FlowEntry flowEntry : flowPath.flowEntries()) {
- flowEntry.setFlowEntryMatch(null);
- flowEntry.setFlowEntryActions(null);
- }
- }
-
- return flowPaths;
- }
-
- /**
* Extract Flow Path State from a Titan Database Object @ref IFlowPath.
*
* @param flowObj the object to extract the Flow Path State from.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index eccf40b..6c200fa 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -7,6 +7,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -27,24 +29,14 @@
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
+import com.esotericsoftware.kryo2.Kryo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A class for storing a pair of Flow Path and a Flow Entry.
- */
-class FlowPathEntryPair {
- protected FlowPath flowPath;
- protected FlowEntry flowEntry;
-
- protected FlowPathEntryPair(FlowPath flowPath, FlowEntry flowEntry) {
- this.flowPath = flowPath;
- this.flowEntry = flowEntry;
- }
-}
-
-/**
* Class for FlowPath Maintenance.
* This class listens for FlowEvents to:
* - Maintain a local cache of the Network Topology.
@@ -58,9 +50,7 @@
private FlowManager flowManager; // The Flow Manager to use
private IDatagridService datagridService; // The Datagrid Service to use
private Topology topology; // The network topology
- private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
- private Map<Long, FlowEntry> unmatchedFlowEntryAdd =
- new HashMap<Long, FlowEntry>();
+ private KryoFactory kryoFactory = new KryoFactory();
// The queue with Flow Path and Topology Element updates
private BlockingQueue<EventEntry<?>> networkEvents =
@@ -74,21 +64,22 @@
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ // All internally computed Flow Paths
+ private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
+
+ // The Flow Entries received as notifications with unmatched Flow Paths
+ private Map<Long, FlowEntry> unmatchedFlowEntryAdd =
+ new HashMap<Long, FlowEntry>();
+
//
// Transient state for processing the Flow Paths:
- // - The new Flow Paths
// - The Flow Paths that should be recomputed
// - The Flow Paths with modified Flow Entries
- // - The Flow Entries that were updated
//
- private List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
- private List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
- private List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
- private List<FlowPathEntryPair> updatedFlowEntries =
- new LinkedList<FlowPathEntryPair>();
- private List<FlowPathEntryPair> unmatchedDeleteFlowEntries =
- new LinkedList<FlowPathEntryPair>();
-
+ private Map<Long, FlowPath> shouldRecomputeFlowPaths =
+ new HashMap<Long, FlowPath>();
+ private Map<Long, FlowPath> modifiedFlowPaths =
+ new HashMap<Long, FlowPath>();
/**
* Constructor for a given Flow Manager and Datagrid Service.
@@ -144,7 +135,9 @@
}
// Process the initial events (if any)
- processEvents();
+ synchronized (allFlowPaths) {
+ processEvents();
+ }
}
/**
@@ -198,7 +191,9 @@
collection.clear();
// Process the events (if any)
- processEvents();
+ synchronized (allFlowPaths) {
+ processEvents();
+ }
}
} catch (Exception exception) {
log.debug("Exception processing Network Events: ", exception);
@@ -209,7 +204,7 @@
* Process the events (if any)
*/
private void processEvents() {
- List<FlowPathEntryPair> modifiedFlowEntries;
+ Collection<FlowEntry> modifiedFlowEntries;
if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
flowEntryEvents.isEmpty()) {
@@ -218,40 +213,30 @@
processFlowPathEvents();
processTopologyEvents();
- //
- // Add all new Flows: should be done after processing the Flow Path
- // and Topology events.
- //
- for (FlowPath flowPath : newFlowPaths) {
- allFlowPaths.put(flowPath.flowId().value(), flowPath);
- }
-
processFlowEntryEvents();
// Recompute all affected Flow Paths and keep only the modified
- for (FlowPath flowPath : recomputeFlowPaths) {
+ for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
if (recomputeFlowPath(flowPath))
- modifiedFlowPaths.add(flowPath);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
}
- modifiedFlowEntries = extractModifiedFlowEntries(modifiedFlowPaths);
+ // Extract the modified Flow Entries
+ modifiedFlowEntries = extractModifiedFlowEntries(modifiedFlowPaths.values());
// Assign missing Flow Entry IDs
assignFlowEntryId(modifiedFlowEntries);
//
- // Push the modified Flow Entries to switches, datagrid and database
+ // Push the modified state to the Flow Manager
//
- flowManager.pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
- flowManager.pushModifiedFlowEntriesToDatagrid(modifiedFlowEntries);
- flowManager.pushModifiedFlowEntriesToDatabase(modifiedFlowEntries);
- flowManager.pushModifiedFlowEntriesToDatabase(updatedFlowEntries);
- flowManager.pushModifiedFlowEntriesToDatabase(unmatchedDeleteFlowEntries);
+ flowManager.pushModifiedFlowState(modifiedFlowPaths.values(),
+ modifiedFlowEntries);
//
// Remove Flow Entries that were deleted
//
- for (FlowPath flowPath : modifiedFlowPaths)
+ for (FlowPath flowPath : modifiedFlowPaths.values())
flowPath.dataPath().removeDeletedFlowEntries();
// Cleanup
@@ -259,29 +244,26 @@
flowPathEvents.clear();
flowEntryEvents.clear();
//
- newFlowPaths.clear();
- recomputeFlowPaths.clear();
+ shouldRecomputeFlowPaths.clear();
modifiedFlowPaths.clear();
- updatedFlowEntries.clear();
- unmatchedDeleteFlowEntries.clear();
}
/**
* Extract the modified Flow Entries.
+ *
+ * @param modifiedFlowPaths the Flow Paths to process.
+ * @return a collection with the modified Flow Entries.
*/
- private List<FlowPathEntryPair> extractModifiedFlowEntries(
- List<FlowPath> modifiedFlowPaths) {
- List<FlowPathEntryPair> modifiedFlowEntries =
- new LinkedList<FlowPathEntryPair>();
+ private Collection<FlowEntry> extractModifiedFlowEntries(
+ Collection<FlowPath> modifiedFlowPaths) {
+ List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
// Extract only the modified Flow Entries
for (FlowPath flowPath : modifiedFlowPaths) {
for (FlowEntry flowEntry : flowPath.flowEntries()) {
if (flowEntry.flowEntrySwitchState() ==
FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
- FlowPathEntryPair flowPair =
- new FlowPathEntryPair(flowPath, flowEntry);
- modifiedFlowEntries.add(flowPair);
+ modifiedFlowEntries.add(flowEntry);
}
}
}
@@ -290,8 +272,11 @@
/**
* Assign the Flow Entry ID as needed.
+ *
+ * @param modifiedFlowEnries the collection of Flow Entries that need
+ * Flow Entry ID assigned.
*/
- private void assignFlowEntryId(List<FlowPathEntryPair> modifiedFlowEntries) {
+ private void assignFlowEntryId(Collection<FlowEntry> modifiedFlowEntries) {
if (modifiedFlowEntries.isEmpty())
return;
@@ -300,9 +285,7 @@
//
// Assign the Flow Entry ID only for Flow Entries for my switches
//
- for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
- FlowEntry flowEntry = flowPair.flowEntry;
- // Update the Flow Entries only for my switches
+ for (FlowEntry flowEntry : modifiedFlowEntries) {
IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
if (mySwitch == null)
continue;
@@ -334,10 +317,8 @@
if (allFlowPaths.get(flowPath.flowId().value()) != null) {
//
// TODO: What to do if the Flow Path already exists?
- // Remove and then re-add it, or merge the info?
- // For now, we don't have to do anything.
+ // Fow now, we just re-add it.
//
- break;
}
switch (flowPath.flowPathType()) {
@@ -347,7 +328,8 @@
// we are going to recompute it anyway.
//
flowPath.flowEntries().clear();
- recomputeFlowPaths.add(flowPath);
+ shouldRecomputeFlowPaths.put(flowPath.flowId().value(),
+ flowPath);
break;
case FP_TYPE_EXPLICIT_PATH:
//
@@ -356,10 +338,10 @@
for (FlowEntry flowEntry : flowPath.flowEntries()) {
flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
}
- modifiedFlowPaths.add(flowPath);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
break;
}
- newFlowPaths.add(flowPath);
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
break;
}
@@ -382,8 +364,11 @@
flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
}
- allFlowPaths.remove(existingFlowPath.flowId().value());
- modifiedFlowPaths.add(existingFlowPath);
+ // Remove the Flow Path from the internal state
+ Long key = existingFlowPath.flowId().value();
+ allFlowPaths.remove(key);
+ shouldRecomputeFlowPaths.remove(key);
+ modifiedFlowPaths.put(key, existingFlowPath);
break;
}
@@ -416,7 +401,7 @@
}
if (isTopologyModified) {
// TODO: For now, if the topology changes, we recompute all Flows
- recomputeFlowPaths.addAll(allFlowPaths.values());
+ shouldRecomputeFlowPaths.putAll(allFlowPaths);
}
}
@@ -424,7 +409,6 @@
* Process the Flow Entry events.
*/
private void processFlowEntryEvents() {
- FlowPathEntryPair flowPair;
FlowPath flowPath;
FlowEntry updatedFlowEntry;
@@ -443,8 +427,7 @@
flowEntry);
continue;
}
- flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
- updatedFlowEntries.add(flowPair);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
}
unmatchedFlowEntryAdd = remainingUpdates;
}
@@ -480,16 +463,15 @@
break;
}
// Add the updated entry to the list of updated Flow Entries
- flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
- updatedFlowEntries.add(flowPair);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
break;
case ENTRY_REMOVE:
flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
if (unmatchedFlowEntryAdd.remove(flowEntry.flowEntryId().value()) != null) {
- continue; // Match found
+ continue; // Removed previously unmatched entry
}
-
+
flowPath = allFlowPaths.get(flowEntry.flowId().value());
if (flowPath == null) {
// Flow Path not found: ignore the update
@@ -497,13 +479,10 @@
}
updatedFlowEntry = updateFlowEntryRemove(flowPath, flowEntry);
if (updatedFlowEntry == null) {
- // Flow Entry not found: add to list of deleted entries
- flowPair = new FlowPathEntryPair(flowPath, flowEntry);
- unmatchedDeleteFlowEntries.add(flowPair);
+ // Flow Entry not found: ignore it
break;
}
- flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
- updatedFlowEntries.add(flowPair);
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
break;
}
}
@@ -883,4 +862,31 @@
new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
networkEvents.add(eventEntry);
}
+
+ /**
+ * Get a sorted copy of all Flow Paths.
+ *
+ * @return a sorted copy of all Flow Paths.
+ */
+ synchronized SortedMap<Long, FlowPath> getAllFlowPathsCopy() {
+ SortedMap<Long, FlowPath> sortedFlowPaths =
+ new TreeMap<Long, FlowPath>();
+
+ //
+ // TODO: For now we use serialization/deserialization to create
+ // a copy of each Flow Path. In the future, we should use proper
+ // copy constructors.
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ synchronized (allFlowPaths) {
+ for (Map.Entry<Long, FlowPath> entry : allFlowPaths.entrySet()) {
+ FlowPath origFlowPath = entry.getValue();
+ FlowPath copyFlowPath = kryo.copy(origFlowPath);
+ sortedFlowPaths.put(entry.getKey(), copyFlowPath);
+ }
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return sortedFlowPaths;
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 9e1c1ec..a2b8728 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -4,8 +4,10 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.SortedMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -61,8 +63,8 @@
private final static Logger log = LoggerFactory.getLogger(FlowManager.class);
// The queue to write Flow Entries to the database
- private BlockingQueue<FlowPathEntryPair> flowEntriesToDatabaseQueue =
- new LinkedBlockingQueue<FlowPathEntryPair>();
+ private BlockingQueue<FlowPath> flowPathsToDatabaseQueue =
+ new LinkedBlockingQueue<FlowPath>();
FlowDatabaseWriter flowDatabaseWriter;
/**
@@ -200,7 +202,7 @@
// The thread to write to the database
//
flowDatabaseWriter = new FlowDatabaseWriter(this,
- flowEntriesToDatabaseQueue);
+ flowPathsToDatabaseQueue);
flowDatabaseWriter.start();
//
@@ -218,11 +220,17 @@
* Add a flow.
*
* @param flowPath the Flow Path to install.
- * @param flowId the return-by-reference Flow ID as assigned internally.
- * @return true on success, otherwise false.
+ * @return the Flow ID on success, otherwise null.
*/
@Override
- public boolean addFlow(FlowPath flowPath, FlowId flowId) {
+ public FlowId addFlow(FlowPath flowPath) {
+
+ // Allocate the Flow ID if necessary
+ if (! flowPath.isValidFlowId()) {
+ long id = getNextFlowEntryId();
+ flowPath.setFlowId(new FlowId(id));
+ }
+
//
// NOTE: We need to explicitly initialize some of the state,
// in case the application didn't do it.
@@ -236,38 +244,11 @@
flowEntry.setFlowId(new FlowId(flowPath.flowId().value()));
}
- dbHandlerApi = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
- if (FlowDatabaseOperation.addFlow(this, dbHandlerApi, flowPath, flowId)) {
+ if (FlowDatabaseOperation.addFlow(dbHandlerApi, flowPath)) {
datagridService.notificationSendFlowAdded(flowPath);
- return true;
+ return flowPath.flowId();
}
- return false;
- }
-
- /**
- * Add a flow entry to the Network MAP.
- *
- * @param flowObj the corresponding Flow Path object for the Flow Entry.
- * @param flowEntry the Flow Entry to install.
- * @return the added Flow Entry object on success, otherwise null.
- */
- private IFlowEntry addFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
- dbHandlerInner = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
- return FlowDatabaseOperation.addFlowEntry(this, dbHandlerInner,
- flowObj, flowEntry);
- }
-
- /**
- * Delete a flow entry from the Network MAP.
- *
- * @param flowObj the corresponding Flow Path object for the Flow Entry.
- * @param flowEntry the Flow Entry to delete.
- * @return true on success, otherwise false.
- */
- private boolean deleteFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
- dbHandlerInner = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
- return FlowDatabaseOperation.deleteFlowEntry(dbHandlerInner,
- flowObj, flowEntry);
+ return null;
}
/**
@@ -325,35 +306,6 @@
}
/**
- * Get all previously added flows by a specific installer for a given
- * data path endpoints.
- *
- * @param installerId the Caller ID of the installer of the flow to get.
- * @param dataPathEndpoints the data path endpoints of the flow to get.
- * @return the Flow Paths if found, otherwise null.
- */
- @Override
- public ArrayList<FlowPath> getAllFlows(CallerId installerId,
- DataPathEndpoints dataPathEndpoints) {
- dbHandlerApi = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
- return FlowDatabaseOperation.getAllFlows(dbHandlerApi, installerId,
- dataPathEndpoints);
- }
-
- /**
- * Get all installed flows by all installers for given data path endpoints.
- *
- * @param dataPathEndpoints the data path endpoints of the flows to get.
- * @return the Flow Paths if found, otherwise null.
- */
- @Override
- public ArrayList<FlowPath> getAllFlows(DataPathEndpoints dataPathEndpoints) {
- dbHandlerApi = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
- return FlowDatabaseOperation.getAllFlows(dbHandlerApi,
- dataPathEndpoints);
- }
-
- /**
* Get summary of all installed flows by all installers in a given range.
*
* @param flowId the Flow ID of the first flow in the flow range to get.
@@ -363,31 +315,28 @@
@Override
public ArrayList<FlowPath> getAllFlowsSummary(FlowId flowId,
int maxFlows) {
- return FlowDatabaseOperation.getAllFlowsSummary(dbHandlerApi, flowId,
- maxFlows);
- }
-
- /**
- * Add and maintain a shortest-path flow.
- *
- * NOTE: The Flow Path argument does NOT contain flow entries.
- *
- * @param flowPath the Flow Path with the endpoints and the match
- * conditions to install.
- * @return the added shortest-path flow on success, otherwise null.
- */
- @Override
- public FlowPath addAndMaintainShortestPathFlow(FlowPath flowPath) {
- //
- // Don't do the shortest path computation here.
- // Instead, let the Flow reconciliation thread take care of it.
- //
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+ SortedMap<Long, FlowPath> sortedFlowPaths =
+ flowEventHandler.getAllFlowPathsCopy();
- FlowId flowId = new FlowId();
- if (! addFlow(flowPath, flowId))
- return null;
+ //
+ // Truncate each Flow Path and Flow Entry
+ //
+ for (FlowPath flowPath : sortedFlowPaths.values()) {
+ //
+ // TODO: Add only the Flow Paths that have been successfully
+ // installed.
+ //
+ flowPath.setFlowEntryMatch(null);
+ flowPath.setFlowEntryActions(null);
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntryMatch(null);
+ flowEntry.setFlowEntryActions(null);
+ }
+ flowPaths.add(flowPath);
+ }
- return (flowPath);
+ return flowPaths;
}
/**
@@ -419,101 +368,35 @@
}
/**
- * Push modified Flow Entries to switches.
+ * Inform the Flow Manager that a collection of Flow Entries have been
+ * pushed to a switch.
*
- * NOTE: Only the Flow Entries to switches controlled by this instance
- * are pushed.
- *
- * @param modifiedFlowEntries the collection of modified Flow Entries.
+ * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+ * that have been pushed.
*/
- public void pushModifiedFlowEntriesToSwitches(
- Collection<FlowPathEntryPair> modifiedFlowEntries) {
- if (modifiedFlowEntries.isEmpty())
- return;
+ public void flowEntriesPushedToSwitch(
+ Collection<Pair<IOFSwitch, FlowEntry>> entries) {
- Map<Long, IOFSwitch> mySwitches = getMySwitches();
-
- for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
- FlowPath flowPath = flowPair.flowPath;
- FlowEntry flowEntry = flowPair.flowEntry;
-
- IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
- if (mySwitch == null)
- continue;
-
- log.debug("Pushing Flow Entry To Switch: {}", flowEntry.toString());
+ //
+ // Process all entries
+ //
+ for (Pair<IOFSwitch, FlowEntry> entry : entries) {
+ IOFSwitch sw = entry.first;
+ FlowEntry flowEntry = entry.second;
//
- // Push the Flow Entry into the switch
- //
- if (! pusher.add(mySwitch, flowPath, flowEntry)) {
- String logMsg = "Cannot install Flow Entry " +
- flowEntry.flowEntryId() +
- " from Flow Path " + flowPath.flowId() +
- " on switch " + flowEntry.dpid();
- log.error(logMsg);
- continue;
- }
-
- //
- // NOTE: Here we assume that the switch has been
- // successfully updated.
+ // Mark the Flow Entry that it has been pushed to the switch
//
flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
- }
- }
- /**
- * Push modified Flow Entries to the datagrid.
- *
- * @param modifiedFlowEntries the collection of modified Flow Entries.
- */
- public void pushModifiedFlowEntriesToDatagrid(
- Collection<FlowPathEntryPair> modifiedFlowEntries) {
- if (modifiedFlowEntries.isEmpty())
- return;
-
- Map<Long, IOFSwitch> mySwitches = getMySwitches();
-
- for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
- FlowEntry flowEntry = flowPair.flowEntry;
-
- if (! flowEntry.isValidFlowEntryId())
- continue;
-
- IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
-
- //
- // TODO: For now Flow Entries are removed by all instances,
- // even if this Flow Entry is not for our switches.
- //
- // This is needed to handle the case a switch going down:
- // it has no Master controller instance, hence no
- // controller instance will cleanup its flow entries.
- // This is sub-optimal: we need to elect a controller
- // instance to handle the cleanup of such orphaned flow
- // entries.
- //
- if (mySwitch == null) {
- if (flowEntry.flowEntryUserState() !=
- FlowEntryUserState.FE_USER_DELETE) {
- continue;
- }
- }
-
- log.debug("Pushing Flow Entry To Datagrid: {}", flowEntry.toString());
//
// Write the Flow Entry to the Datagrid
//
switch (flowEntry.flowEntryUserState()) {
case FE_USER_ADD:
- if (mySwitch == null)
- break; // Install only flow entries for my switches
datagridService.notificationSendFlowEntryAdded(flowEntry);
break;
case FE_USER_MODIFY:
- if (mySwitch == null)
- break; // Install only flow entries for my switches
datagridService.notificationSendFlowEntryUpdated(flowEntry);
break;
case FE_USER_DELETE:
@@ -524,11 +407,122 @@
}
/**
+ * Push modified Flow-related state as appropriate.
+ *
+ * @param modifiedFlowPaths the collection of modified Flow Paths.
+ * @param modifiedFlowEntries the collection of modified Flow Entries.
+ */
+ void pushModifiedFlowState(Collection<FlowPath> modifiedFlowPaths,
+ Collection<FlowEntry> modifiedFlowEntries) {
+ //
+ // Push the modified Flow state:
+ // - Flow Entries to switches and the datagrid
+ // - Flow Paths to the database
+ //
+ pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
+ pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
+ cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+ }
+
+ /**
+ * Push modified Flow Entries to switches.
+ *
+ * NOTE: Only the Flow Entries to switches controlled by this instance
+ * are pushed.
+ *
+ * @param modifiedFlowEntries the collection of modified Flow Entries.
+ */
+ private void pushModifiedFlowEntriesToSwitches(
+ Collection<FlowEntry> modifiedFlowEntries) {
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ List<Pair<IOFSwitch, FlowEntry>> entries =
+ new LinkedList<Pair<IOFSwitch, FlowEntry>>();
+
+ Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+ //
+ // Create a collection of my Flow Entries to push
+ //
+ for (FlowEntry flowEntry : modifiedFlowEntries) {
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+ if (mySwitch == null)
+ continue;
+
+ //
+ // Assign Flow Entry IDs if missing.
+ //
+ // NOTE: This is an additional safeguard, in case the
+ // mySwitches set has changed (after the Flow Entry IDs
+ // assignments by the caller).
+ //
+ if (! flowEntry.isValidFlowEntryId()) {
+ long id = getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+
+ log.debug("Pushing Flow Entry To Switch: {}", flowEntry.toString());
+ entries.add(new Pair<IOFSwitch, FlowEntry>(mySwitch, flowEntry));
+ }
+
+ pusher.pushFlowEntries(entries);
+ }
+
+ /**
+ * Cleanup deleted Flow Entries from the datagrid.
+ *
+ * NOTE: We cleanup only the Flow Entries that are not for our switches.
+ * This is needed to handle the case a switch going down:
+ * It has no Master controller instance, hence no controller instance
+ * will cleanup its flow entries.
+ * This is sub-optimal: we need to elect a controller instance to handle
+ * the cleanup of such orphaned flow entries.
+ *
+ * @param modifiedFlowEntries the collection of modified Flow Entries.
+ */
+ private void cleanupDeletedFlowEntriesFromDatagrid(
+ Collection<FlowEntry> modifiedFlowEntries) {
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+ for (FlowEntry flowEntry : modifiedFlowEntries) {
+ //
+ // Process only Flow Entries that should be deleted and have
+ // a valid Flow Entry ID.
+ //
+ if (! flowEntry.isValidFlowEntryId())
+ continue;
+ if (flowEntry.flowEntryUserState() !=
+ FlowEntryUserState.FE_USER_DELETE) {
+ continue;
+ }
+
+ //
+ // NOTE: The deletion of Flow Entries for my switches is handled
+ // elsewhere.
+ //
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+ if (mySwitch != null)
+ continue;
+
+ log.debug("Pushing cleanup of Flow Entry To Datagrid: {}", flowEntry.toString());
+
+ //
+ // Write the Flow Entry to the Datagrid
+ //
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ }
+ }
+
+ /**
* Class to implement writing to the database in a separate thread.
*/
class FlowDatabaseWriter extends Thread {
private FlowManager flowManager;
- private BlockingQueue<FlowPathEntryPair> blockingQueue;
+ private BlockingQueue<FlowPath> blockingQueue;
/**
* Constructor.
@@ -537,7 +531,7 @@
* @param blockingQueue the blocking queue to use.
*/
FlowDatabaseWriter(FlowManager flowManager,
- BlockingQueue<FlowPathEntryPair> blockingQueue) {
+ BlockingQueue<FlowPath> blockingQueue) {
this.flowManager = flowManager;
this.blockingQueue = blockingQueue;
}
@@ -550,14 +544,13 @@
//
// The main loop
//
- Collection<FlowPathEntryPair> collection =
- new LinkedList<FlowPathEntryPair>();
+ Collection<FlowPath> collection = new LinkedList<FlowPath>();
try {
while (true) {
- FlowPathEntryPair entryPair = blockingQueue.take();
- collection.add(entryPair);
+ FlowPath flowPath = blockingQueue.take();
+ collection.add(flowPath);
blockingQueue.drainTo(collection);
- flowManager.writeModifiedFlowEntriesToDatabase(collection);
+ flowManager.writeModifiedFlowPathsToDatabase(collection);
collection.clear();
}
} catch (Exception exception) {
@@ -567,46 +560,40 @@
}
/**
- * Push Flow Entries to the Network MAP.
+ * Push Flow Paths to the Network MAP.
*
- * NOTE: The Flow Entries are pushed only on the instance responsible
- * for the first switch. This is to avoid database errors when multiple
- * instances are writing Flow Entries for the same Flow Path.
+ * NOTE: The complete Flow Paths are pushed only on the instance
+ * responsible for the first switch. This is to avoid database errors
+ * when multiple instances are writing Flow Entries for the same Flow Path.
*
- * @param modifiedFlowEntries the collection of Flow Entries to push.
+ * @param modifiedFlowPaths the collection of Flow Paths to push.
*/
- void pushModifiedFlowEntriesToDatabase(
- Collection<FlowPathEntryPair> modifiedFlowEntries) {
+ private void pushModifiedFlowPathsToDatabase(
+ Collection<FlowPath> modifiedFlowPaths) {
//
- // We only add the Flow Entries to the Database Queue.
+ // We only add the Flow Paths to the Database Queue.
// The FlowDatabaseWriter thread is responsible for the actual writing.
//
- flowEntriesToDatabaseQueue.addAll(modifiedFlowEntries);
+ flowPathsToDatabaseQueue.addAll(modifiedFlowPaths);
}
/**
- * Write Flow Entries to the Network MAP.
+ * Write Flow Paths to the Network MAP.
*
- * NOTE: The Flow Entries are written only on the instance responsible
- * for the first switch. This is to avoid database errors when multiple
- * instances are writing Flow Entries for the same Flow Path.
+ * NOTE: The complete Flow Paths are pushed only on the instance
+ * responsible for the first switch. This is to avoid database errors
+ * when multiple instances are writing Flow Entries for the same Flow Path.
*
- * @param modifiedFlowEntries the collection of Flow Entries to write.
+ * @param modifiedFlowPaths the collection of Flow Paths to write.
*/
- private void writeModifiedFlowEntriesToDatabase(
- Collection<FlowPathEntryPair> modifiedFlowEntries) {
- if (modifiedFlowEntries.isEmpty())
+ private void writeModifiedFlowPathsToDatabase(
+ Collection<FlowPath> modifiedFlowPaths) {
+ if (modifiedFlowPaths.isEmpty())
return;
Map<Long, IOFSwitch> mySwitches = getMySwitches();
- for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
- FlowPath flowPath = flowPair.flowPath;
- FlowEntry flowEntry = flowPair.flowEntry;
-
- if (! flowEntry.isValidFlowEntryId())
- continue;
-
+ for (FlowPath flowPath : modifiedFlowPaths) {
//
// Push the changes only on the instance responsible for the
// first switch.
@@ -616,71 +603,57 @@
if (mySrcSwitch == null)
continue;
- log.debug("Pushing Flow Entry To Database: {}", flowEntry.toString());
//
- // Write the Flow Entry to the Network Map
+ // Delete the Flow Path from the Network Map
//
- // NOTE: We try a number of times, in case somehow some other
- // instances are writing at the same time.
- // Apparently, if other instances are writing at the same time
- // this will trigger an error.
- //
- for (int i = 0; i < 6; i++) {
+ if (flowPath.flowPathUserState() ==
+ FlowPathUserState.FP_USER_DELETE) {
+ log.debug("Deleting Flow Path From Database: {}",
+ flowPath.toString());
+
try {
- //
- // Find the Flow Path in the Network MAP.
- //
- // NOTE: The Flow Path might not be found if the Flow was
- // just removed by some other controller instance.
- //
- System.out.println("writeModifiedFlowEntriesToDatabase");
- dbHandlerInner = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
- IFlowPath flowObj =
- dbHandlerInner.searchFlowPath(flowEntry.flowId());
- if (flowObj == null) {
- String logMsg = "Cannot find Network MAP entry for Flow Path " + flowEntry.flowId();
- log.error(logMsg);
- break;
+ if (! FlowDatabaseOperation.deleteFlow(
+ dbHandlerInner,
+ flowPath.flowId())) {
+ log.error("Cannot delete Flow Path {} from Network Map",
+ flowPath.flowId());
}
-
- // Write the Flow Entry
- switch (flowEntry.flowEntryUserState()) {
- case FE_USER_ADD:
- // FALLTHROUGH
- case FE_USER_MODIFY:
- if (addFlowEntry(flowObj, flowEntry) == null) {
- String logMsg = "Cannot write to Network MAP Flow Entry " +
- flowEntry.flowEntryId() +
- " from Flow Path " + flowEntry.flowId() +
- " on switch " + flowEntry.dpid();
- log.error(logMsg);
- }
- break;
- case FE_USER_DELETE:
- if (deleteFlowEntry(flowObj, flowEntry) == false) {
- String logMsg = "Cannot remove from Network MAP Flow Entry " +
- flowEntry.flowEntryId() +
- " from Flow Path " + flowEntry.flowId() +
- " on switch " + flowEntry.dpid();
- log.error(logMsg);
- }
- break;
- }
-
- // Commit to the database
- dbHandlerInner.commit();
- break; // Success
-
} catch (Exception e) {
- log.debug("Exception writing Flow Entry to Network MAP: ", e);
- dbHandlerInner.rollback();
- // Wait a bit (random value [1ms, 20ms] and try again
- int delay = 1 + randomGenerator.nextInt() % 20;
- try {
- Thread.sleep(delay);
- } catch (Exception e0) {
- }
+ log.error("Exception deleting Flow Path from Network MAP: {}", e);
}
+ continue;
+ }
+
+ //
+ // Test whether all Flow Entries are valid
+ //
+ boolean allValid = true;
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntryUserState() ==
+ FlowEntryUserState.FE_USER_DELETE) {
+ continue;
+ }
+ if (! flowEntry.isValidFlowEntryId()) {
+ allValid = false;
+ break;
+ }
+ }
+ if (! allValid)
+ continue;
+
+ log.debug("Pushing Flow Path To Database: {}", flowPath.toString());
+
+ //
+ // Write the Flow Path to the Network Map
+ //
+ try {
+ if (! FlowDatabaseOperation.addFlow(dbHandlerInner, flowPath)) {
+ String logMsg = "Cannot write to Network Map Flow Path " +
+ flowPath.flowId();
+ log.error(logMsg);
+ }
+ } catch (Exception e) {
+ log.error("Exception writing Flow Path to Network MAP: ", e);
}
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java
deleted file mode 100644
index 8bed120..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java
+++ /dev/null
@@ -1,689 +0,0 @@
-package net.onrc.onos.ofcontroller.flowmanager;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import net.floodlightcontroller.core.IOFSwitch;
-import net.floodlightcontroller.util.MACAddress;
-import net.floodlightcontroller.util.OFMessageDamper;
-
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
-import net.onrc.onos.ofcontroller.util.*;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
-
-import org.openflow.protocol.OFFlowMod;
-import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFPacketOut;
-import org.openflow.protocol.OFPort;
-import org.openflow.protocol.OFType;
-import org.openflow.protocol.action.*;
-import org.openflow.protocol.factory.BasicFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class for performing Flow-related operations on the Switch.
- */
-class FlowSwitchOperation {
- private final static Logger log = LoggerFactory.getLogger(FlowSwitchOperation.class);
- //
- // TODO: Values copied from elsewhere (class LearningSwitch).
- // The local copy should go away!
- //
- public static final short PRIORITY_DEFAULT = 100;
- public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
- public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
-
- // TODO add Pusher instance member
- //
-
- /**
- * Install a Flow Entry on a switch.
- *
- * @param messageFactory the OpenFlow message factory to use.
- * @param messageDamper the OpenFlow message damper to use.
- * @param mySwitch the switch to install the Flow Entry into.
- * @param flowObj the flow path object for the flow entry to install.
- * @param flowEntryObj the flow entry object to install.
- * @return true on success, otherwise false.
- */
- static boolean installFlowEntry(BasicFactory messageFactory,
- OFMessageDamper messageDamper,
- IOFSwitch mySwitch, IFlowPath flowObj,
- IFlowEntry flowEntryObj) {
- String flowEntryIdStr = flowEntryObj.getFlowEntryId();
- if (flowEntryIdStr == null)
- return false;
- FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
- String userState = flowEntryObj.getUserState();
- if (userState == null)
- return false;
-
- //
- // Create the Open Flow Flow Modification Entry to push
- //
- OFFlowMod fm = (OFFlowMod)messageFactory.getMessage(OFType.FLOW_MOD);
- long cookie = flowEntryId.value();
-
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (userState.equals("FE_USER_ADD")) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (userState.equals("FE_USER_MODIFY")) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (userState.equals("FE_USER_DELETE")) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntryId.toString(), userState);
- return false;
- }
-
- //
- // Fetch the match conditions.
- //
- // NOTE: The Flow matching conditions common for all Flow Entries are
- // used ONLY if a Flow Entry does NOT have the corresponding matching
- // condition set.
- //
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
-
- // Match the Incoming Port
- Short matchInPort = flowEntryObj.getMatchInPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- String matchSrcMac = flowEntryObj.getMatchSrcMac();
- if (matchSrcMac == null)
- matchSrcMac = flowObj.getMatchSrcMac();
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- String matchDstMac = flowEntryObj.getMatchDstMac();
- if (matchDstMac == null)
- matchDstMac = flowObj.getMatchDstMac();
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType == null)
- matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryObj.getMatchVlanId();
- if (matchVlanId == null)
- matchVlanId = flowObj.getMatchVlanId();
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
- if (matchVlanPriority == null)
- matchVlanPriority = flowObj.getMatchVlanPriority();
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net == null)
- matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
- }
-
- // Natch the Destination IPv4 Network prefix
- String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net == null)
- matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryObj.getMatchIpProto();
- if (matchIpProto == null)
- matchIpProto = flowObj.getMatchIpProto();
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryObj.getMatchIpToS();
- if (matchIpToS == null)
- matchIpToS = flowObj.getMatchIpToS();
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort == null)
- matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort == null)
- matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- //
- // Fetch the actions
- //
- Short actionOutputPort = null;
- List<OFAction> openFlowActions = new ArrayList<OFAction>();
- int actionsLen = 0;
- FlowEntryActions flowEntryActions = null;
- String actionsStr = flowEntryObj.getActions();
- if (actionsStr != null)
- flowEntryActions = new FlowEntryActions(actionsStr);
- else
- flowEntryActions = new FlowEntryActions();
- for (FlowEntryAction action : flowEntryActions.actions()) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort = actionOutput.port().value();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa =
- new OFActionOutput(actionOutput.port().value(),
- (short)0xffff);
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan() == true) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().value(),
- actionEnqueue.queueId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
- .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
- .setPriority(PRIORITY_DEFAULT)
- .setBufferId(OFPacketOut.BUFFER_ID_NONE)
- .setCookie(cookie)
- .setCommand(flowModCommand)
- .setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
- (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort != null)
- fm.setOutPort(actionOutputPort);
- }
-
- //
- // TODO: Set the following flag
- // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- // See method ForwardingBase::pushRoute()
- //
-
- //
- // Write the message to the switch
- //
- log.debug("MEASUREMENT: Installing flow entry " + userState +
- " into switch DPID: " +
- mySwitch.getStringId() +
- " flowEntryId: " + flowEntryId.toString() +
- " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
- " inPort: " + matchInPort + " outPort: " + actionOutputPort
- );
- try {
- messageDamper.write(mySwitch, fm, null);
- mySwitch.flush();
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- //
- flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
- } catch (IOException e) {
- log.error("Failure writing flow mod from network map", e);
- return false;
- }
-
- return true;
- }
-
- /**
- * Install a Flow Entry on a switch.
- *
- * @param messageFactory the OpenFlow message factory to use.
- * @maram messageDamper the OpenFlow message damper to use.
- * @param mySwitch the switch to install the Flow Entry into.
- * @param flowPath the flow path for the flow entry to install.
- * @param flowEntry the flow entry to install.
- * @return true on success, otherwise false.
- */
- static boolean installFlowEntry(BasicFactory messageFactory,
- OFMessageDamper messageDamper,
- IOFSwitch mySwitch, FlowPath flowPath,
- FlowEntry flowEntry) {
- //
- // Create the OpenFlow Flow Modification Entry to push
- //
- OFFlowMod fm = (OFFlowMod)messageFactory.getMessage(OFType.FLOW_MOD);
- long cookie = flowEntry.flowEntryId().value();
-
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntry.flowEntryId().toString(),
- flowEntry.flowEntryUserState());
- return false;
- }
-
- //
- // Fetch the match conditions.
- //
- // NOTE: The Flow matching conditions common for all Flow Entries are
- // used ONLY if a Flow Entry does NOT have the corresponding matching
- // condition set.
- //
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
- FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
- FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
-
- // Match the Incoming Port
- Port matchInPort = flowEntryMatch.inPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort.value());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- MACAddress matchSrcMac = flowEntryMatch.srcMac();
- if ((matchSrcMac == null) && (flowPathMatch != null)) {
- matchSrcMac = flowPathMatch.srcMac();
- }
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- MACAddress matchDstMac = flowEntryMatch.dstMac();
- if ((matchDstMac == null) && (flowPathMatch != null)) {
- matchDstMac = flowPathMatch.dstMac();
- }
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
- if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
- matchEthernetFrameType = flowPathMatch.ethernetFrameType();
- }
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryMatch.vlanId();
- if ((matchVlanId == null) && (flowPathMatch != null)) {
- matchVlanId = flowPathMatch.vlanId();
- }
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryMatch.vlanPriority();
- if ((matchVlanPriority == null) && (flowPathMatch != null)) {
- matchVlanPriority = flowPathMatch.vlanPriority();
- }
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
- if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
- matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
- }
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
- }
-
- // Natch the Destination IPv4 Network prefix
- IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
- if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
- matchDstIPv4Net = flowPathMatch.dstIPv4Net();
- }
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryMatch.ipProto();
- if ((matchIpProto == null) && (flowPathMatch != null)) {
- matchIpProto = flowPathMatch.ipProto();
- }
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryMatch.ipToS();
- if ((matchIpToS == null) && (flowPathMatch != null)) {
- matchIpToS = flowPathMatch.ipToS();
- }
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
- if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
- matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
- }
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
- if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
- matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
- }
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- //
- // Fetch the actions
- //
- Short actionOutputPort = null;
- List<OFAction> openFlowActions = new ArrayList<OFAction>();
- int actionsLen = 0;
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- for (FlowEntryAction action : flowEntryActions.actions()) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort = actionOutput.port().value();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa =
- new OFActionOutput(actionOutput.port().value(),
- (short)0xffff);
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan() == true) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().value(),
- actionEnqueue.queueId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
- .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
- .setPriority(PRIORITY_DEFAULT)
- .setBufferId(OFPacketOut.BUFFER_ID_NONE)
- .setCookie(cookie)
- .setCommand(flowModCommand)
- .setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
- (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort != null)
- fm.setOutPort(actionOutputPort);
- }
-
- //
- // TODO: Set the following flag
- // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- // See method ForwardingBase::pushRoute()
- //
-
- //
- // Write the message to the switch
- //
- log.debug("MEASUREMENT: Installing flow entry " +
- flowEntry.flowEntryUserState() +
- " into switch DPID: " +
- mySwitch.getStringId() +
- " flowEntryId: " + flowEntry.flowEntryId().toString() +
- " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
- " inPort: " + matchInPort + " outPort: " + actionOutputPort
- );
- try {
- messageDamper.write(mySwitch, fm, null);
- mySwitch.flush();
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- //
- // TODO: The FlowEntry Object in Titan should be set
- // to FE_SWITCH_UPDATED.
- //
- } catch (IOException e) {
- log.error("Failure writing flow mod from network map", e);
- return false;
- }
- return true;
- }
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index 8d2b797..a25602d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -1,15 +1,18 @@
package net.onrc.onos.ofcontroller.flowmanager;
import java.util.ArrayList;
+import java.util.Collection;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.util.CallerId;
import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
/**
* Interface for providing Flow Service to other modules.
@@ -18,14 +21,10 @@
/**
* Add a flow.
*
- * Internally, ONOS will automatically register the installer for
- * receiving Flow Path Notifications for that path.
- *
* @param flowPath the Flow Path to install.
- * @param flowId the return-by-reference Flow ID as assigned internally.
- * @return true on success, otherwise false.
+ * @return the Flow ID on success, otherwise null.
*/
- boolean addFlow(FlowPath flowPath, FlowId flowId);
+ FlowId addFlow(FlowPath flowPath);
/**
* Delete all previously added flows.
@@ -58,25 +57,6 @@
ArrayList<FlowPath> getAllFlows();
/**
- * Get all previously added flows by a specific installer for a given
- * data path endpoints.
- *
- * @param installerId the Caller ID of the installer of the flow to get.
- * @param dataPathEndpoints the data path endpoints of the flow to get.
- * @return the Flow Paths if found, otherwise null.
- */
- ArrayList<FlowPath> getAllFlows(CallerId installerId,
- DataPathEndpoints dataPathEndpoints);
-
- /**
- * Get all installed flows by all installers for given data path endpoints.
- *
- * @param dataPathEndpoints the data path endpoints of the flows to get.
- * @return the Flow Paths if found, otherwise null.
- */
- ArrayList<FlowPath> getAllFlows(DataPathEndpoints dataPathEndpoints);
-
- /**
* Get summary of all installed flows by all installers.
*
* @param flowId starting flow Id of the range
@@ -84,21 +64,6 @@
* @return the Flow Paths if found, otherwise null.
*/
ArrayList<FlowPath> getAllFlowsSummary(FlowId flowId, int maxFlows);
-
- /**
- * Add and maintain a shortest-path flow.
- *
- * NOTE: The Flow Path argument does NOT contain all flow entries.
- * Instead, it contains a single dummy flow entry that is used to
- * store the matching condition(s).
- * That entry is replaced by the appropriate entries from the
- * internally performed shortest-path computation.
- *
- * @param flowPath the Flow Path with the endpoints and the match
- * conditions to install.
- * @return the added shortest-path flow on success, otherwise null.
- */
- FlowPath addAndMaintainShortestPathFlow(FlowPath flowPath);
/**
* Get the network topology.
@@ -106,7 +71,7 @@
* @return the network topology.
*/
Topology getTopology();
-
+
/**
* Get a globally unique flow ID from the flow service.
* NOTE: Not currently guaranteed to be globally unique.
@@ -122,4 +87,14 @@
* @param flowEntryId the Flow Entry ID of the expired Flow Entry.
*/
public void flowEntryOnSwitchExpired(IOFSwitch sw, FlowEntryId flowEntryId);
+
+ /**
+ * Inform the Flow Manager that a collection of Flow Entries have been
+ * pushed to a switch.
+ *
+ * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+ * that have been pushed.
+ */
+ public void flowEntriesPushedToSwitch(
+ Collection<Pair<IOFSwitch, FlowEntry>> entries);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java
index 0926f91..9afaaec 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java
@@ -64,9 +64,9 @@
// Process the request
if (flowPath != null) {
- if (flowService.addFlow(flowPath, result) != true) {
- result = new FlowId(); // Error: Return empty Flow Id
- }
+ FlowId addedFlowId = flowService.addFlow(flowPath);
+ if (addedFlowId != null)
+ result = addedFlowId;
}
return result;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddShortestPathFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddShortestPathFlowResource.java
index 7a4e88c..4d03623 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddShortestPathFlowResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddShortestPathFlowResource.java
@@ -64,13 +64,9 @@
// Process the request
if (flowPath != null) {
- FlowPath addedFlowPath =
- flowService.addAndMaintainShortestPathFlow(flowPath);
- if (addedFlowPath == null) {
- result = new FlowId(); // Error: Return empty Flow Id
- } else {
- result = addedFlowPath.flowId();
- }
+ FlowId addedFlowId = flowService.addFlow(flowPath);
+ if (addedFlowId != null)
+ result = addedFlowId;
}
return result;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
index 81d26dd..c358263 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
@@ -20,8 +20,6 @@
router.attach("/add-shortest-path/json", AddShortestPathFlowResource.class);
router.attach("/delete/{flow-id}/json", DeleteFlowResource.class);
router.attach("/get/{flow-id}/json", GetFlowByIdResource.class);
- router.attach("/getall-by-installer-id/{installer-id}/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json", GetAllFlowsByInstallerIdResource.class);
- router.attach("/getall-by-endpoints/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json", GetAllFlowsByEndpointsResource.class);
router.attach("/getall/json", GetAllFlowsResource.class);
router.attach("/getsummary/{flow-id}/{max-flows}/json", GetSummaryFlowsResource.class);
return router;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByEndpointsResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByEndpointsResource.java
deleted file mode 100644
index 1ac98c0..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByEndpointsResource.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package net.onrc.onos.ofcontroller.flowmanager.web;
-
-import java.util.ArrayList;
-
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
-import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
-import net.onrc.onos.ofcontroller.util.Dpid;
-import net.onrc.onos.ofcontroller.util.FlowPath;
-import net.onrc.onos.ofcontroller.util.Port;
-import net.onrc.onos.ofcontroller.util.SwitchPort;
-
-import org.restlet.resource.Get;
-import org.restlet.resource.ServerResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Flow Manager REST API implementation: Get all Flow state for given
- * source and destination switches and ports.
- *
- * The "{src-dpid}" request attribute value is the source DPID of the flows to
- * get.
- * The "{src-port}" request attribute value is the source port of the flows to
- * get.
- * The "{dst-dpid}" request attribute value is the destination DPID of the
- * flows to get.
- * The "{dst-port}" request attribute value is the destination port of the
- * flows to get.
- *
- * GET /wm/flow/getall-by-endpoints/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json"
- */
-public class GetAllFlowsByEndpointsResource extends ServerResource {
- protected final static Logger log = LoggerFactory.getLogger(GetAllFlowsByEndpointsResource.class);
-
- /**
- * Implement the API.
- *
- * @return the collection of Flow states if any found, otherwise null.
- */
- @Get("json")
- public ArrayList<FlowPath> retrieve() {
- ArrayList<FlowPath> result = null;
-
- IFlowService flowService =
- (IFlowService)getContext().getAttributes().
- get(IFlowService.class.getCanonicalName());
-
- if (flowService == null) {
- log.debug("ONOS Flow Service not found");
- return result;
- }
-
- // Extract the arguments
- String srcDpidStr = (String) getRequestAttributes().get("src-dpid");
- String srcPortStr = (String) getRequestAttributes().get("src-port");
- String dstDpidStr = (String) getRequestAttributes().get("dst-dpid");
- String dstPortStr = (String) getRequestAttributes().get("dst-port");
-
- log.debug("Get All Flows Endpoints: " + srcDpidStr + "--" +
- srcPortStr + "--" + dstDpidStr + "--" + dstPortStr);
-
- Dpid srcDpid = new Dpid(srcDpidStr);
- Port srcPort = new Port(Short.parseShort(srcPortStr));
- Dpid dstDpid = new Dpid(dstDpidStr);
- Port dstPort = new Port(Short.parseShort(dstPortStr));
- SwitchPort srcSwitchPort = new SwitchPort(srcDpid, srcPort);
- SwitchPort dstSwitchPort = new SwitchPort(dstDpid, dstPort);
- DataPathEndpoints dataPathEndpoints =
- new DataPathEndpoints(srcSwitchPort, dstSwitchPort);
-
- result = flowService.getAllFlows(dataPathEndpoints);
-
- return result;
- }
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByInstallerIdResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByInstallerIdResource.java
deleted file mode 100644
index 870548e..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByInstallerIdResource.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package net.onrc.onos.ofcontroller.flowmanager.web;
-
-import java.util.ArrayList;
-
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
-import net.onrc.onos.ofcontroller.util.CallerId;
-import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
-import net.onrc.onos.ofcontroller.util.Dpid;
-import net.onrc.onos.ofcontroller.util.FlowPath;
-import net.onrc.onos.ofcontroller.util.Port;
-import net.onrc.onos.ofcontroller.util.SwitchPort;
-
-import org.restlet.resource.Get;
-import org.restlet.resource.ServerResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Flow Manager REST API implementation: Get all Flow state for a given
- * Installer ID and given source and destination switches and ports.
- *
- * The "{installer-id}" request attribute value is the Installer ID of the
- * flows to get.
- * The "{src-dpid}" request attribute value is the source DPID of the flows to
- * get.
- * The "{src-port}" request attribute value is the source port of the flows to
- * get.
- * The "{dst-dpid}" request attribute value is the destination DPID of the
- * flows to get.
- * The "{dst-port}" request attribute value is the destination port of the
- * flows to get.
- *
- * GET /wm/flow/getall-by-installer-id/{installer-id}/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json"
- */
-public class GetAllFlowsByInstallerIdResource extends ServerResource {
- protected final static Logger log = LoggerFactory.getLogger(GetAllFlowsByInstallerIdResource.class);
-
- /**
- * Implement the API.
- *
- * @return the collection of Flow states if any found, otherwise null.
- */
- @Get("json")
- public ArrayList<FlowPath> retrieve() {
- ArrayList<FlowPath> result = null;
-
- IFlowService flowService =
- (IFlowService)getContext().getAttributes().
- get(IFlowService.class.getCanonicalName());
-
- if (flowService == null) {
- log.debug("ONOS Flow Service not found");
- return result;
- }
-
- // Extract the arguments
- String installerIdStr = (String) getRequestAttributes().get("installer-id");
- String srcDpidStr = (String) getRequestAttributes().get("src-dpid");
- String srcPortStr = (String) getRequestAttributes().get("src-port");
- String dstDpidStr = (String) getRequestAttributes().get("dst-dpid");
- String dstPortStr = (String) getRequestAttributes().get("dst-port");
-
- log.debug("Get All Flow By Installer: " + installerIdStr +
- " Endpoints: " +
- srcDpidStr + "--" + srcPortStr + "--" +
- dstDpidStr + "--" + dstPortStr);
-
- CallerId installerId = new CallerId(installerIdStr);
- Dpid srcDpid = new Dpid(srcDpidStr);
- Port srcPort = new Port(Short.parseShort(srcPortStr));
- Dpid dstDpid = new Dpid(dstDpidStr);
- Port dstPort = new Port(Short.parseShort(dstPortStr));
- SwitchPort srcSwitchPort = new SwitchPort(srcDpid, srcPort);
- SwitchPort dstSwitchPort = new SwitchPort(dstDpid, dstPort);
- DataPathEndpoints dataPathEndpoints =
- new DataPathEndpoints(srcSwitchPort, dstSwitchPort);
-
- result = flowService.getAllFlows(installerId, dataPathEndpoints);
-
- return result;
- }
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 8f2469b..c3c7107 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -3,9 +3,11 @@
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,8 +29,7 @@
import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
import net.onrc.onos.ofcontroller.util.FlowEntry;
@@ -36,8 +37,8 @@
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
-import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.IPv4Net;
+import net.onrc.onos.ofcontroller.util.Pair;
import net.onrc.onos.ofcontroller.util.Port;
/**
@@ -54,6 +55,7 @@
*/
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+ protected volatile IFlowService flowManager;
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
@@ -157,7 +159,7 @@
// wait for message pushed to queue
mutex.acquire();
} catch (InterruptedException e) {
- e.printStackTrace();
+ // not an error
log.debug("FlowPusherThread is interrupted");
return;
}
@@ -224,7 +226,7 @@
OFMessage msg = queue.poll();
try {
messageDamper.write(sw, msg, context);
- log.debug("Pusher sends message : {}", msg);
+// log.debug("Pusher sends message : {}", msg);
size += msg.getLength();
} catch (IOException e) {
e.printStackTrace();
@@ -270,6 +272,7 @@
this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
+ flowManager = modContext.getServiceImpl(IFlowService.class);
if (damper != null) {
messageDamper = damper;
@@ -327,6 +330,13 @@
synchronized (queue) {
if (queue.state == QueueState.SUSPENDED) {
queue.state = QueueState.READY;
+
+ // Latch down if queue is not empty
+ FlowPusherThread thread = getProcess(sw);
+ if (! queue.isEmpty() &&
+ thread.mutex.availablePermits() == 0) {
+ thread.mutex.release();
+ }
return true;
}
return false;
@@ -430,7 +440,7 @@
synchronized (queue) {
queue.add(msg);
- log.debug("Message is pushed : {}", msg);
+// log.debug("Message is pushed : {}", msg);
}
if (proc.mutex.availablePermits() == 0) {
@@ -439,314 +449,46 @@
return true;
}
-
+
@Override
- public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
- log.debug("sending : {}, {}", sw, flowObj);
- String flowEntryIdStr = flowEntryObj.getFlowEntryId();
- if (flowEntryIdStr == null)
- return false;
- FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
- String userState = flowEntryObj.getUserState();
- if (userState == null)
- return false;
+ public void pushFlowEntries(
+ Collection<Pair<IOFSwitch, FlowEntry>> entries) {
- //
- // Create the Open Flow Flow Modification Entry to push
- //
- OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
- long cookie = flowEntryId.value();
+ List<Pair<IOFSwitch, FlowEntry>> pushedEntries =
+ new LinkedList<Pair<IOFSwitch, FlowEntry>>();
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (userState.equals("FE_USER_ADD")) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (userState.equals("FE_USER_MODIFY")) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (userState.equals("FE_USER_DELETE")) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntryId.toString(), userState);
- return false;
+ for (Pair<IOFSwitch, FlowEntry> entry : entries) {
+ if (add(entry.first, entry.second)) {
+ pushedEntries.add(entry);
+ }
}
//
- // Fetch the match conditions.
+ // TODO: We should use the OpenFlow Barrier mechanism
+ // to check for errors, and update the SwitchState
+ // for a flow entry after the Barrier message is
+ // is received.
+ // Only after inform the Flow Manager that the entry is pushed.
//
- // NOTE: The Flow matching conditions common for all Flow Entries are
- // used ONLY if a Flow Entry does NOT have the corresponding matching
- // condition set.
- //
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
-
- // Match the Incoming Port
- Short matchInPort = flowEntryObj.getMatchInPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- String matchSrcMac = flowEntryObj.getMatchSrcMac();
- if (matchSrcMac == null)
- matchSrcMac = flowObj.getMatchSrcMac();
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- String matchDstMac = flowEntryObj.getMatchDstMac();
- if (matchDstMac == null)
- matchDstMac = flowObj.getMatchDstMac();
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType == null)
- matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryObj.getMatchVlanId();
- if (matchVlanId == null)
- matchVlanId = flowObj.getMatchVlanId();
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
- if (matchVlanPriority == null)
- matchVlanPriority = flowObj.getMatchVlanPriority();
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net == null)
- matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
- }
-
- // Match the Destination IPv4 Network prefix
- String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net == null)
- matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryObj.getMatchIpProto();
- if (matchIpProto == null)
- matchIpProto = flowObj.getMatchIpProto();
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryObj.getMatchIpToS();
- if (matchIpToS == null)
- matchIpToS = flowObj.getMatchIpToS();
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort == null)
- matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort == null)
- matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- //
- // Fetch the actions
- //
- Short actionOutputPort = null;
- List<OFAction> openFlowActions = new ArrayList<OFAction>();
- int actionsLen = 0;
- FlowEntryActions flowEntryActions = null;
- String actionsStr = flowEntryObj.getActions();
- if (actionsStr != null)
- flowEntryActions = new FlowEntryActions(actionsStr);
- else
- flowEntryActions = new FlowEntryActions();
- for (FlowEntryAction action : flowEntryActions.actions()) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort = actionOutput.port().value();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa =
- new OFActionOutput(actionOutput.port().value(),
- (short)0xffff);
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan() == true) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().value(),
- actionEnqueue.queueId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
- .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
- .setPriority(PRIORITY_DEFAULT)
- .setBufferId(OFPacketOut.BUFFER_ID_NONE)
- .setCookie(cookie)
- .setCommand(flowModCommand)
- .setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
- (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort != null)
- fm.setOutPort(actionOutputPort);
- }
-
- //
- // TODO: Set the following flag
- // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- // See method ForwardingBase::pushRoute()
- //
-
- //
- // Write the message to the switch
- //
- log.debug("MEASUREMENT: Installing flow entry " + userState +
- " into switch DPID: " +
- sw.getStringId() +
- " flowEntryId: " + flowEntryId.toString() +
- " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
- " inPort: " + matchInPort + " outPort: " + actionOutputPort
- );
- add(sw,fm);
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- //
- flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
-
- return true;
+ flowManager.flowEntriesPushedToSwitch(pushedEntries);
}
-
+
@Override
- public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
+ public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
+ Collection<Pair<IOFSwitch, FlowEntry>> entries =
+ new LinkedList<Pair<IOFSwitch, FlowEntry>>();
+
+ entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
+ pushFlowEntries(entries);
+ }
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
+ private boolean add(IOFSwitch sw, FlowEntry flowEntry) {
//
// Create the OpenFlow Flow Modification Entry to push
//
@@ -1003,24 +745,14 @@
//
// Write the message to the switch
//
- log.debug("MEASUREMENT: Installing flow entry "
+ log.debug("Installing flow entry "
+ flowEntry.flowEntryUserState() + " into switch DPID: "
+ sw.getStringId() + " flowEntryId: "
+ flowEntry.flowEntryId().toString() + " srcMac: "
+ matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
+ matchInPort + " outPort: " + actionOutputPort);
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- //
- // TODO: The FlowEntry Object in Titan should be set
- // to FE_SWITCH_UPDATED.
- //
-
- return add(sw,fm);
+ return add(sw, fm);
}
@Override
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index c62bd5d..351df84 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -26,7 +26,9 @@
import net.onrc.onos.graph.GraphDBManager;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
+import net.onrc.onos.ofcontroller.flowmanager.FlowDatabaseOperation;
import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
/**
@@ -237,7 +239,15 @@
return;
}
- pusher.add(sw, iFlowEntry.getFlow(), iFlowEntry);
+ FlowEntry flowEntry =
+ FlowDatabaseOperation.extractFlowEntry(iFlowEntry);
+ if (flowEntry == null) {
+ log.error("Cannot add flow entry {} to sw {} : flow entry cannot be extracted",
+ flowEntryId, sw.getId());
+ return;
+ }
+
+ pusher.pushFlowEntry(sw, flowEntry);
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index 20a6249..6bf20d9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -1,15 +1,15 @@
package net.onrc.onos.ofcontroller.flowprogrammer;
+import java.util.Collection;
+
import org.openflow.protocol.OFBarrierReply;
import org.openflow.protocol.OFMessage;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.util.FlowEntry;
-import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
/**
* FlowPusherService is a service to send message to switches in proper rate.
@@ -50,6 +50,9 @@
/**
* Add a message to the queue of the switch.
+ *
+ * Note: Notification is NOT delivered for the pushed message.
+ *
* @param sw Switch to which message is pushed.
* @param msg Message object to be added.
* @return true if message is successfully added to a queue.
@@ -57,22 +60,28 @@
boolean add(IOFSwitch sw, OFMessage msg);
/**
- * Create a message from FlowEntry and add it to the queue of the switch.
+ * Push a collection of Flow Entries to the corresponding switches.
+ *
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
+ * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+ * to push.
+ */
+ void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries);
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the
+ * switch.
+ *
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
* @param sw Switch to which message is pushed.
- * @param flowPath FlowPath object used for creating message.
* @param flowEntry FlowEntry object used for creating message.
* @return true if message is successfully added to a queue.
*/
- boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry);
-
- /**
- * Create a message from IFlowEntry and add it to the queue of the switch.
- * @param sw Switch to which message is pushed.
- * @param flowObj IFlowPath object used for creating message.
- * @param flowEntryObj IFlowEntry object used for creating message.
- * @return true if message is successfully added to a queue.
- */
- boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
+ void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry);
/**
* Set sending rate to a switch.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoInterruptResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoInterruptResource.java
new file mode 100644
index 0000000..3c2920d
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoInterruptResource.java
@@ -0,0 +1,44 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Interrupt synchronization to a switch.
+ *
+ * GET /wm/fprog/synchronizer/interrupt/{dpid}/json"
+ */
+public class DoInterruptResource extends SynchronizerResource {
+
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (! init()) {
+ return false;
+ }
+
+ long dpid;
+ try {
+ dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ synchronizer.interrupt(sw);
+
+ return true;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoSynchronizeResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoSynchronizeResource.java
new file mode 100644
index 0000000..dc8d431
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoSynchronizeResource.java
@@ -0,0 +1,44 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Begin synchronization to a switch.
+ *
+ * GET /wm/fprog/synchronizer/sync/{dpid}/json"
+ */
+public class DoSynchronizeResource extends SynchronizerResource {
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (! init()) {
+ return false;
+ }
+
+ long dpid;
+ try {
+ dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ synchronizer.synchronize(sw);
+
+ return true;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
index 21e5bfb..22450f7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
@@ -11,10 +11,12 @@
@Override
public Restlet getRestlet(Context context) {
Router router = new Router(context);
- router.attach("/setrate/{dpid}/{rate}/json", SetPushRateResource.class);
- router.attach("/suspend/{dpid}/json", SuspendPusherResource.class);
- router.attach("/resume/{dpid}/json", ResumePusherResource.class);
- router.attach("/barrier/{dpid}/json", SendBarrierResource.class);
+ router.attach("/pusher/setrate/{dpid}/{rate}/json", SetPushRateResource.class);
+ router.attach("/pusher/suspend/{dpid}/json", SuspendPusherResource.class);
+ router.attach("/pusher/resume/{dpid}/json", ResumePusherResource.class);
+ router.attach("/pusher/barrier/{dpid}/json", SendBarrierResource.class);
+ router.attach("/synchronizer/sync/{dpid}/json", DoSynchronizeResource.class);
+ router.attach("/synchronizer/interrupt/{dpid}/json", DoInterruptResource.class);
return router;
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
index ca1ec00..dcbe3e9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
@@ -8,7 +8,7 @@
/**
* FlowProgrammer REST API implementation: Resume sending message to switch.
*
- * GET /wm/fprog/resume/{dpid}/json"
+ * GET /wm/fprog/pusher/resume/{dpid}/json"
*/
public class ResumePusherResource extends PusherResource {
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
index 9c348ff..33b666a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
@@ -9,7 +9,7 @@
/**
* FlowProgrammer REST API implementation: Send barrier message to switch.
*
- * GET /wm/fprog/barrier/{dpid}/json"
+ * GET /wm/fprog/pusher/barrier/{dpid}/json"
*/
public class SendBarrierResource extends PusherResource {
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
index 08a728e..9431d65 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
@@ -8,7 +8,7 @@
/**
* FlowProgrammer REST API implementation: Set sending rate to the switch.
*
- * GET /wm/fprog/setrate/{dpid}/{rate}/json"
+ * GET /wm/fprog/pusher/setrate/{dpid}/{rate}/json"
*/
public class SetPushRateResource extends PusherResource {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
index 39d245b..1a5266b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
@@ -10,7 +10,7 @@
/**
* FlowProgrammer REST API implementation: Suspend sending message to switch.
*
- * GET /wm/fprog/suspend/{dpid}/json"
+ * GET /wm/fprog/pusher/suspend/{dpid}/json"
*/
public class SuspendPusherResource extends PusherResource {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SynchronizerResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SynchronizerResource.java
new file mode 100644
index 0000000..12bf8f3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SynchronizerResource.java
@@ -0,0 +1,35 @@
+package net.onrc.onos.ofcontroller.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowSyncService;
+
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizerResource extends ServerResource {
+ protected final static Logger log = LoggerFactory.getLogger(SynchronizerResource.class);
+
+ protected IFloodlightProviderService provider;
+ protected IFlowSyncService synchronizer;
+
+ protected boolean init() {
+ provider = (IFloodlightProviderService)
+ getContext().getAttributes().
+ get(IFloodlightProviderService.class.getCanonicalName());
+ if (provider == null) {
+ log.debug("ONOS FloodlightProvider not found");
+ return false;
+ }
+
+ synchronizer = (IFlowSyncService)
+ getContext().getAttributes().
+ get(IFlowSyncService.class.getCanonicalName());
+ if (synchronizer == null) {
+ log.debug("ONOS FlowSyncService not found");
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index e5144ab..05de13e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -1,6 +1,9 @@
package net.onrc.onos.ofcontroller.forwarding;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -29,7 +32,11 @@
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFPacketIn;
+import org.openflow.protocol.OFPacketOut;
+import org.openflow.protocol.OFPort;
import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.action.OFActionOutput;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,6 +134,8 @@
ISwitchObject switchObject = portObject.getSwitch();
long destinationDpid = HexString.toLong(switchObject.getDPID());
+ // TODO SwitchPort, Dpid and Port should probably be immutable
+ // (also, are Dpid and Port are even necessary?)
SwitchPort srcSwitchPort = new SwitchPort(
new Dpid(sw.getId()), new Port(pi.getInPort()));
SwitchPort dstSwitchPort = new SwitchPort(
@@ -139,7 +148,31 @@
dstSwitchPort, dstMacAddress)) {
log.debug("Not adding flow because it already exists");
- // Don't do anything if the flow already exists
+ // TODO check reverse flow as well
+
+ DataPath shortestPath =
+ topologyService.getDatabaseShortestPath(srcSwitchPort, dstSwitchPort);
+
+ if (shortestPath == null || shortestPath.flowEntries().isEmpty()) {
+ log.warn("No path found between {} and {} - not handling packet",
+ srcSwitchPort, dstSwitchPort);
+ return;
+ }
+
+ Port outPort = shortestPath.flowEntries().get(0).outPort();
+ forwardPacket(pi, sw, outPort.value());
+ return;
+ }
+
+ // Calculate a shortest path before pushing flow mods.
+ // This will be used later by the packet-out processing, but it uses
+ // the database so will be slow, and we should do it before flow mods.
+ DataPath shortestPath =
+ topologyService.getDatabaseShortestPath(srcSwitchPort, dstSwitchPort);
+
+ if (shortestPath == null || shortestPath.flowEntries().isEmpty()) {
+ log.warn("No path found between {} and {} - not handling packet",
+ srcSwitchPort, dstSwitchPort);
return;
}
@@ -151,21 +184,53 @@
dataPath.setSrcPort(srcSwitchPort);
dataPath.setDstPort(dstSwitchPort);
- FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
+ CallerId callerId = new CallerId("Forwarding");
+
+ //FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
FlowPath flowPath = new FlowPath();
- flowPath.setFlowId(flowId);
- flowPath.setInstallerId(new CallerId("Forwarding"));
+ //flowPath.setFlowId(flowId);
+ flowPath.setInstallerId(callerId);
+
flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
flowPath.setFlowEntryMatch(new FlowEntryMatch());
flowPath.flowEntryMatch().enableSrcMac(srcMacAddress);
flowPath.flowEntryMatch().enableDstMac(dstMacAddress);
// For now just forward IPv4 packets. This prevents accidentally
- // other stuff like ARP.
+ // forwarding other stuff like ARP.
flowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
flowPath.setDataPath(dataPath);
+
+ FlowId flowId = flowService.addFlow(flowPath);
+ //flowService.addFlow(flowPath, flowId);
- flowService.addFlow(flowPath, flowId);
+
+ DataPath reverseDataPath = new DataPath();
+ // Reverse the ports for the reverse path
+ reverseDataPath.setSrcPort(dstSwitchPort);
+ reverseDataPath.setDstPort(srcSwitchPort);
+
+ //FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
+ // TODO implement copy constructor for FlowPath
+ FlowPath reverseFlowPath = new FlowPath();
+ //reverseFlowPath.setFlowId(reverseFlowId);
+ reverseFlowPath.setInstallerId(callerId);
+ reverseFlowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
+ reverseFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
+ reverseFlowPath.setFlowEntryMatch(new FlowEntryMatch());
+ // Reverse the MAC addresses for the reverse path
+ reverseFlowPath.flowEntryMatch().enableSrcMac(dstMacAddress);
+ reverseFlowPath.flowEntryMatch().enableDstMac(srcMacAddress);
+ reverseFlowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
+ reverseFlowPath.setDataPath(reverseDataPath);
+ reverseFlowPath.dataPath().srcPort().dpid().toString();
+
+ // TODO what happens if no path exists?
+ //flowService.addFlow(reverseFlowPath, reverseFlowId);
+ FlowId reverseFlowId = flowService.addFlow(reverseFlowPath);
+
+ Port outPort = shortestPath.flowEntries().get(0).outPort();
+ forwardPacket(pi, sw, outPort.value());
}
private boolean flowExists(SwitchPort srcPort, MACAddress srcMac,
@@ -194,4 +259,31 @@
return false;
}
+ private void forwardPacket(OFPacketIn pi, IOFSwitch sw, short port) {
+ List<OFAction> actions = new ArrayList<OFAction>(1);
+ actions.add(new OFActionOutput(port));
+
+ OFPacketOut po = new OFPacketOut();
+ po.setInPort(OFPort.OFPP_NONE)
+ .setInPort(pi.getInPort())
+ .setActions(actions)
+ .setActionsLength((short)OFActionOutput.MINIMUM_LENGTH)
+ .setLengthU(OFPacketOut.MINIMUM_LENGTH + OFActionOutput.MINIMUM_LENGTH);
+
+ if (sw.getBuffers() == 0) {
+ po.setBufferId(OFPacketOut.BUFFER_ID_NONE)
+ .setPacketData(pi.getPacketData())
+ .setLengthU(po.getLengthU() + po.getPacketData().length);
+ }
+ else {
+ po.setBufferId(pi.getBufferId());
+ }
+
+ try {
+ sw.write(po, null);
+ sw.flush();
+ } catch (IOException e) {
+ log.error("Error writing packet out to switch: {}", e);
+ }
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index ec8065c..f6d8c9c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -17,7 +17,6 @@
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
-import net.floodlightcontroller.devicemanager.IDevice;
import net.floodlightcontroller.packet.ARP;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.packet.IPv4;
@@ -35,6 +34,9 @@
import net.onrc.onos.ofcontroller.core.internal.DeviceStorageImpl;
import net.onrc.onos.ofcontroller.core.internal.TopoLinkServiceImpl;
import net.onrc.onos.ofcontroller.core.internal.TopoSwitchServiceImpl;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.Port;
+import net.onrc.onos.ofcontroller.util.SwitchPort;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFPacketIn;
@@ -246,21 +248,14 @@
public Command receive(
IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
- //if (msg.getType() != OFType.PACKET_IN){
- //return Command.CONTINUE;
- //}
- log.debug("received packet");
-
OFPacketIn pi = (OFPacketIn) msg;
Ethernet eth = IFloodlightProviderService.bcStore.get(cntx,
IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
if (eth.getEtherType() == Ethernet.TYPE_ARP){
- log.debug("is arp");
ARP arp = (ARP) eth.getPayload();
if (arp.getOpCode() == ARP.OP_REQUEST) {
- log.debug("is request");
//TODO check what the DeviceManager does about propagating
//or swallowing ARPs. We want to go after DeviceManager in the
//chain but we really need it to CONTINUE ARP packets so we can
@@ -268,7 +263,6 @@
handleArpRequest(sw, pi, arp, eth);
}
else if (arp.getOpCode() == ARP.OP_REPLY) {
- log.debug("is reply");
//handleArpReply(sw, pi, arp);
}
}
@@ -548,6 +542,8 @@
}
private void broadcastArpRequestOutMyEdge(byte[] arpRequest) {
+ List<SwitchPort> switchPorts = new ArrayList<SwitchPort>();
+
for (IOFSwitch sw : floodlightProvider.getSwitches().values()) {
OFPacketOut po = new OFPacketOut();
@@ -565,6 +561,8 @@
for (IPortObject portObject : ports) {
if (!portObject.getLinkedPorts().iterator().hasNext()) {
+ switchPorts.add(new SwitchPort(new Dpid(sw.getId()),
+ new Port(portObject.getNumber())));
actions.add(new OFActionOutput(portObject.getNumber()));
}
}
@@ -583,6 +581,8 @@
log.error("Failure writing packet out to switch", e);
}
}
+
+ log.debug("Broadcast ARP request for to: {}", switchPorts);
}
private void sendArpRequestOutPort(byte[] arpRequest, long dpid, short port) {
@@ -722,13 +722,17 @@
@Override
public void arpRequestNotification(ArpMessage arpMessage) {
- log.debug("Received ARP notification from other instances");
+ //log.debug("Received ARP notification from other instances");
switch (arpMessage.getType()){
case REQUEST:
+ log.debug("Received ARP request notification for {}",
+ arpMessage.getAddress());
broadcastArpRequestOutMyEdge(arpMessage.getPacket());
break;
case REPLY:
+ log.debug("Received ARP reply notification for {}",
+ arpMessage.getAddress());
sendArpReplyToWaitingRequesters(arpMessage.getAddress());
break;
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowPath.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowPath.java
index a720fc6..ab3edb1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowPath.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowPath.java
@@ -6,6 +6,7 @@
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
/**
@@ -210,6 +211,18 @@
}
/**
+ * Test whether the Flow ID is valid.
+ *
+ * @return true if the Flow ID is valid, otherwise false.
+ */
+ @JsonIgnore
+ public boolean isValidFlowId() {
+ if (this.flowId == null)
+ return false;
+ return (this.flowId.isValid());
+ }
+
+ /**
* Get the Caller ID of the flow path installer.
*
* @return the Caller ID of the flow path installer.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/Pair.java b/src/main/java/net/onrc/onos/ofcontroller/util/Pair.java
new file mode 100644
index 0000000..2245758
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/Pair.java
@@ -0,0 +1,20 @@
+package net.onrc.onos.ofcontroller.util;
+
+/**
+ * A generic class representing a pair of two values.
+ */
+public class Pair<F, S> {
+ public F first; // The first value in the pair
+ public S second; // The second value in the pair
+
+ /**
+ * Constructor for a pair of two values.
+ *
+ * @param first the first value in the pair.
+ * @param second the second value in the pair.
+ */
+ public Pair(F first, S second) {
+ this.first = first;
+ this.second = second;
+ }
+}
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
index bbe7e31..9876c0d 100644
--- a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
@@ -142,7 +142,7 @@
/**
- * Test method for {@link FlowManager#addFlow(FlowPath, FlowId, String)}.
+ * Test method for {@link FlowManager#addFlow(FlowPath)}.
* @throws Exception
*/
@Test
@@ -163,15 +163,15 @@
replayAll();
fm.init(context);
- Boolean result = fm.addFlow(flowPath, flowId);
+ FlowId result = fm.addFlow(flowPath);
// verify the test
verifyAll();
- assertFalse(result);
+ assertNotNull(result);
}
/**
- * Test method for {@link FlowManager#addFlow(FlowPath, FlowId)}.
+ * Test method for {@link FlowManager#addFlow(FlowPath)}.
* @throws Exception
*/
@Test
@@ -233,11 +233,11 @@
replayAll();
fm.init(context);
- Boolean result = fm.addFlow(flowPath, new FlowId(0x100));
+ FlowId result = fm.addFlow(flowPath);
// verify the test
verifyAll();
- assertTrue(result);
+ assertNotNull(result);
}
/**
@@ -388,76 +388,6 @@
}
/**
- * Test method for {@link FlowManager#getAllFlows(CallerId, DataPathEndpoints)}.
- * @throws Exception
- */
- @Test
- public final void testGetAllFlowsWithCallerIdAndDataPathEndpointsSuccessNormally() throws Exception {
- final String getAllFlows = "getAllFlows";
- // create mock objects
- FlowManager fm = createPartialMock(FlowManager.class, getAllFlows,
- new Class<?>[]{}, new Object[]{});
-
- // instantiate required objects
- DataPathEndpoints dataPathEndpoints = new DataPathEndpoints(
- new SwitchPort(new Dpid(1), new Port((short)1)),
- new SwitchPort(new Dpid(2), new Port((short)2)));
-
- ArrayList<FlowPath> obtainedAllFlows = createTestFlowPaths();
-
- //setup expectations
- expectInitWithContext();
- expectPrivate(fm, getAllFlows).andReturn(obtainedAllFlows);
-
- //start the test
- replayAll();
-
- fm.init(context);
- ArrayList<FlowPath> flows = fm.getAllFlows(new CallerId("caller id"), dataPathEndpoints);
-
- // verify the test
- verifyAll();
- assertEquals(1, flows.size());
- assertEquals(obtainedAllFlows.get(1), flows.get(0));
- }
-
- /**
- * Test method for {@link FlowManager#getAllFlows(DataPathEndpoints)}.
- * @throws Exception
- */
- @Test
- public final void testGetAllFlowsWithDataPathEndpointsSuccessNormally() throws Exception {
- final String getAllFlows = "getAllFlows";
- // create mock objects
- FlowManager fm = createPartialMock(FlowManager.class, getAllFlows,
- new Class<?>[]{}, new Object[]{});
-
- // instantiate required objects
- DataPathEndpoints dataPathEndpoints = new DataPathEndpoints(
- new SwitchPort(new Dpid(1), new Port((short)1)),
- new SwitchPort(new Dpid(2), new Port((short)2)));
-
- ArrayList<FlowPath> obtainedAllFlows = createTestFlowPaths();
-
- //setup expectations
- expectInitWithContext();
- expectPrivate(fm, getAllFlows).andReturn(obtainedAllFlows);
-
- //start the test
- replayAll();
-
- fm.init(context);
- ArrayList<FlowPath> flows = fm.getAllFlows(dataPathEndpoints);
-
- // verify the test
- verifyAll();
- assertEquals(2, flows.size());
- assertEquals(obtainedAllFlows.get(0), flows.get(0));
- assertEquals(obtainedAllFlows.get(1), flows.get(1));
- // TODO: ignore the order of flows in the list
- }
-
- /**
* Test method for {@link FlowManager#getAllFlowsSummary(FlowId, int)}.
* @throws Exception
*/
@@ -533,72 +463,6 @@
// TODO: more asserts
// TODO: ignore seq. of the list
}
-
- /**
- * Test method for {@link FlowManager#addAndMaintainShortestPathFlow(FlowPath)}.
- * @throws Exception
- */
- @Test
- public final void testAddAndMaintainShortestPathFlowSuccessNormally() throws Exception {
- final String addFlow = "addFlow";
-
- // create mock objects
- FlowManager fm = createPartialMockAndInvokeDefaultConstructor(FlowManager.class, addFlow);
-
- // instantiate required objects
- DataPath dataPath = new DataPath();
- dataPath.setSrcPort(new SwitchPort(new Dpid(1), new Port((short)3)));
- dataPath.setDstPort(new SwitchPort(new Dpid(2), new Port((short)4)));
- FlowEntryMatch match = new FlowEntryMatch();
- FlowPath paramFlow = new FlowPath();
- paramFlow.setFlowId(new FlowId(100));
- paramFlow.setInstallerId(new CallerId("installer id"));
- paramFlow.setFlowPathType(FlowPathType.valueOf("FP_TYPE_SHORTEST_PATH"));
- paramFlow.setFlowPathUserState(FlowPathUserState.valueOf("FP_USER_ADD"));
- paramFlow.setFlowPathFlags(new FlowPathFlags(0));
- paramFlow.setDataPath(dataPath);
- paramFlow.setFlowEntryMatch(match);
-
- // setup expectations
- expectInitWithContext();
- expectPrivate(fm, addFlow,
- EasyMock.anyObject(FlowPath.class),
- EasyMock.anyObject(FlowId.class),
- EasyMock.anyObject(String.class)
- ).andAnswer(new IAnswer<Object>() {
- public Object answer() throws Exception {
- FlowPath flowPath = (FlowPath)EasyMock.getCurrentArguments()[0];
- assertEquals(flowPath.flowId().value(), 100);
- assertEquals(flowPath.installerId().toString(), "installer id");
- assertEquals(flowPath.flowPathType().toString(), "PF_TYPE_SHORTEST_PATH");
- assertEquals(flowPath.flowPathUserState().toString(), "PF_USER_STATE");
- assertEquals(flowPath.flowPathFlags().flags(), 0);
- assertEquals(flowPath.dataPath().srcPort().toString(),
- new SwitchPort(new Dpid(1), new Port((short)3)).toString());
-
- String dataPathSummary = (String)EasyMock.getCurrentArguments()[2];
- assertEquals(dataPathSummary, "X");
-
- return true;
- }
- });
-
- // start the test
- replayAll();
-
- fm.init(context);
- FlowPath resultFlow = fm.addAndMaintainShortestPathFlow(paramFlow);
-
- // verify the test
- verifyAll();
- assertEquals(paramFlow.flowId().value(), resultFlow.flowId().value());
- assertEquals(paramFlow.installerId().toString(), resultFlow.installerId().toString());
- assertEquals(paramFlow.flowPathType().toString(), resultFlow.flowPathType().toString());
- assertEquals(paramFlow.flowPathUserState().toString(), resultFlow.flowPathUserState().toString());
- assertEquals(paramFlow.flowPathFlags().flags(), resultFlow.flowPathFlags().flags());
- assertEquals(paramFlow.dataPath().toString(), resultFlow.dataPath().toString());
- assertEquals(paramFlow.flowEntryMatch().toString(), resultFlow.flowEntryMatch().toString());
- }
// INetMapStorage methods
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
new file mode 100644
index 0000000..4779b75
--- /dev/null
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
@@ -0,0 +1,544 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.floodlightcontroller.util.OFMessageDamper;
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryActions;
+import net.onrc.onos.ofcontroller.util.FlowEntryErrorState;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
+import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.Port;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Test;
+import org.openflow.protocol.OFBarrierRequest;
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.factory.BasicFactory;
+
+public class FlowPusherTest {
+ private FlowPusher pusher;
+ private FloodlightContext context;
+ private FloodlightModuleContext modContext;
+ private BasicFactory factory;
+ private OFMessageDamper damper;
+ private IFloodlightProviderService flProviderService;
+ private IThreadPoolService threadPoolService;
+ private IFlowService flowService;
+
+ /**
+ * Test single OFMessage is correctly sent to single switch via MessageDamper.
+ */
+ @Test
+ public void testAddMessage() {
+ beginInitMock();
+
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(sw);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+
+ try {
+ // wait until message is processed.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed in Thread.sleep()");
+ }
+
+ EasyMock.verify(msg);
+ EasyMock.verify(sw);
+
+ pusher.stop();
+ }
+
+ /**
+ * Test bunch of OFMessages are correctly sent to single switch via MessageDamper.
+ */
+ @Test
+ public void testMassiveAddMessage() {
+ final int NUM_MSG = 10000;
+
+ beginInitMock();
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.replay(sw);
+
+ List<OFMessage> messages = new ArrayList<OFMessage>();
+
+ for (int i = 0; i < NUM_MSG; ++i) {
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(i).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+ messages.add(msg);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ for (OFMessage msg : messages) {
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+ }
+
+ try {
+ // wait until message is processed.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed in Thread.sleep()");
+ }
+
+ for (OFMessage msg : messages) {
+ EasyMock.verify(msg);
+ }
+ EasyMock.verify(sw);
+
+ pusher.stop();
+ }
+
+ /**
+ * Test bunch of OFMessages are correctly sent to multiple switches with single threads.
+ */
+ @Test
+ public void testMultiSwitchAddMessage() {
+ final int NUM_SWITCH = 10;
+ final int NUM_MSG = 100; // messages per thread
+
+ beginInitMock();
+
+ Map<IOFSwitch, List<OFMessage>> sw_map = new HashMap<IOFSwitch, List<OFMessage>>();
+ for (int i = 0; i < NUM_SWITCH; ++i) {
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)i).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.replay(sw);
+
+ List<OFMessage> messages = new ArrayList<OFMessage>();
+
+ for (int j = 0; j < NUM_MSG; ++j) {
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(j).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+ messages.add(msg);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+ }
+ sw_map.put(sw, messages);
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ for (IOFSwitch sw : sw_map.keySet()) {
+ for (OFMessage msg : sw_map.get(sw)) {
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+ }
+ }
+
+ try {
+ // wait until message is processed.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed in Thread.sleep()");
+ }
+
+ for (IOFSwitch sw : sw_map.keySet()) {
+ for (OFMessage msg : sw_map.get(sw)) {
+ EasyMock.verify(msg);
+ }
+
+ EasyMock.verify(sw);
+ }
+
+ pusher.stop();
+ }
+
+ /**
+ * Test bunch of OFMessages are correctly sent to multiple switches using multiple threads.
+ */
+ @Test
+ public void testMultiThreadedAddMessage() {
+ final int NUM_THREAD = 10;
+ final int NUM_MSG = 100; // messages per thread
+
+ beginInitMock();
+
+ Map<IOFSwitch, List<OFMessage>> sw_map = new HashMap<IOFSwitch, List<OFMessage>>();
+ for (int i = 0; i < NUM_THREAD; ++i) {
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)i).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.replay(sw);
+
+ List<OFMessage> messages = new ArrayList<OFMessage>();
+
+ for (int j = 0; j < NUM_MSG; ++j) {
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(j).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+ messages.add(msg);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+ }
+ sw_map.put(sw, messages);
+ }
+
+ endInitMock();
+ initPusher(NUM_THREAD);
+
+ for (IOFSwitch sw : sw_map.keySet()) {
+ for (OFMessage msg : sw_map.get(sw)) {
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+ }
+ }
+
+ try {
+ // wait until message is processed.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed in Thread.sleep()");
+ }
+
+ for (IOFSwitch sw : sw_map.keySet()) {
+ for (OFMessage msg : sw_map.get(sw)) {
+ EasyMock.verify(msg);
+ }
+
+ EasyMock.verify(sw);
+ }
+
+ pusher.stop();
+ }
+
+ private long barrierTime = 0;
+ /**
+ * Test rate limitation of messages works correctly.
+ */
+ @Test
+ public void testRateLimitedAddMessage() {
+ final long LIMIT_RATE = 100; // [bytes/ms]
+ final int NUM_MSG = 1000;
+
+ // Accuracy of FlowPusher's rate calculation can't be measured by unit test
+ // because switch doesn't return BARRIER_REPLY.
+ // In unit test we use approximate way to measure rate. This value is
+ // acceptable margin of measured rate.
+ final double ACCEPTABLE_RATE = LIMIT_RATE * 1.2;
+
+ beginInitMock();
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ prepareBarrier(sw);
+ EasyMock.replay(sw);
+
+ List<OFMessage> messages = new ArrayList<OFMessage>();
+
+ for (int i = 0; i < NUM_MSG; ++i) {
+ OFMessage msg = EasyMock.createMock(OFMessage.class);
+ EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.expect(msg.getLengthU()).andReturn(100).anyTimes();
+ EasyMock.replay(msg);
+ messages.add(msg);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+ }
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), (OFMessage)EasyMock.anyObject(), EasyMock.eq(context)))
+ .andAnswer(new IAnswer<Boolean>() {
+ @Override
+ public Boolean answer() throws Throwable {
+ OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
+ if (msg.getType() == OFType.BARRIER_REQUEST) {
+ barrierTime = System.currentTimeMillis();
+ }
+ return true;
+ }
+ }).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ pusher.createQueue(sw);
+ pusher.setRate(sw, LIMIT_RATE);
+
+ long beginTime = System.currentTimeMillis();
+ for (OFMessage msg : messages) {
+ boolean add_result = pusher.add(sw, msg);
+ assertTrue(add_result);
+ }
+
+ pusher.barrierAsync(sw);
+
+ try {
+ do {
+ Thread.sleep(1000);
+ } while (barrierTime == 0);
+ } catch (InterruptedException e) {
+ fail("Failed to sleep");
+ }
+
+ double measured_rate = NUM_MSG * 100 / (barrierTime - beginTime);
+ assertTrue(measured_rate < ACCEPTABLE_RATE);
+
+ for (OFMessage msg : messages) {
+ EasyMock.verify(msg);
+ }
+ EasyMock.verify(sw);
+
+ pusher.stop();
+ }
+
+ /**
+ * Test barrier message is correctly sent to a switch.
+ */
+ @Test
+ public void testBarrierMessage() {
+ beginInitMock();
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().atLeastOnce();
+ prepareBarrier(sw);
+ EasyMock.replay(sw);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), (OFMessage)EasyMock.anyObject(), EasyMock.eq(context)))
+ .andReturn(true).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ OFBarrierReplyFuture future = pusher.barrierAsync(sw);
+
+ assertNotNull(future);
+ pusher.stop();
+ }
+
+ /**
+ * Test FlowObject is correctly converted to message and is sent to a switch.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAddFlow() {
+ // Code below are copied from FlowManagerTest
+
+ // instantiate required objects
+ FlowEntry flowEntry1 = new FlowEntry();
+ flowEntry1.setDpid(new Dpid(1));
+ flowEntry1.setFlowId(new FlowId(1));
+ flowEntry1.setInPort(new Port((short) 1));
+ flowEntry1.setOutPort(new Port((short) 11));
+ flowEntry1.setFlowEntryId(new FlowEntryId(1));
+ flowEntry1.setFlowEntryMatch(new FlowEntryMatch());
+ flowEntry1.setFlowEntryActions(new FlowEntryActions());
+ flowEntry1.setFlowEntryErrorState(new FlowEntryErrorState());
+ flowEntry1.setFlowEntryUserState(FlowEntryUserState.FE_USER_ADD);
+
+ beginInitMock();
+
+ OFFlowMod msg = EasyMock.createMock(OFFlowMod.class);
+ EasyMock.expect(msg.setIdleTimeout(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setHardTimeout(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setPriority(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setBufferId(EasyMock.anyInt())).andReturn(msg);
+ EasyMock.expect(msg.setCookie(EasyMock.anyLong())).andReturn(msg);
+ EasyMock.expect(msg.setCommand(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setMatch(EasyMock.anyObject(OFMatch.class))).andReturn(msg);
+ EasyMock.expect(msg.setActions((List<OFAction>)EasyMock.anyObject())).andReturn(msg);
+ EasyMock.expect(msg.setLengthU(EasyMock.anyShort())).andReturn(msg);
+ EasyMock.expect(msg.setOutPort(EasyMock.anyShort())).andReturn(msg).atLeastOnce();
+ EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
+ EasyMock.expect(msg.getType()).andReturn(OFType.FLOW_MOD).anyTimes();
+ EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(msg);
+
+ EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.FLOW_MOD))).andReturn(msg);
+
+ ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
+ EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
+ (TimeUnit)EasyMock.anyObject())).andReturn(null).once();
+ EasyMock.replay(executor);
+ EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor);
+
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ EasyMock.expect(sw.getStringId()).andReturn("1").anyTimes();
+ sw.flush();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(sw);
+
+ try {
+ EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.anyObject(OFMessage.class), EasyMock.eq(context)))
+ .andAnswer(new IAnswer<Boolean>() {
+ @Override
+ public Boolean answer() throws Throwable {
+ OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
+ assertEquals(msg.getType(), OFType.FLOW_MOD);
+ return true;
+ }
+ }).once();
+ } catch (IOException e1) {
+ fail("Failed in OFMessageDamper#write()");
+ }
+
+ endInitMock();
+ initPusher(1);
+
+ pusher.pushFlowEntry(sw, flowEntry1);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed to sleep");
+ }
+
+ EasyMock.verify(sw);
+
+ pusher.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void beginInitMock() {
+ context = EasyMock.createMock(FloodlightContext.class);
+ modContext = EasyMock.createMock(FloodlightModuleContext.class);
+ factory = EasyMock.createMock(BasicFactory.class);
+ damper = EasyMock.createMock(OFMessageDamper.class);
+ flProviderService = EasyMock.createMock(IFloodlightProviderService.class);
+ threadPoolService = EasyMock.createMock(IThreadPoolService.class);
+ flowService = EasyMock.createMock(IFlowService.class);
+
+ flowService.flowEntriesPushedToSwitch(EasyMock.anyObject(Collection.class));
+ EasyMock.expectLastCall().anyTimes();
+
+ EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IThreadPoolService.class)))
+ .andReturn(threadPoolService).once();
+ EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IFloodlightProviderService.class)))
+ .andReturn(flProviderService).once();
+ EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IFlowService.class)))
+ .andReturn(flowService).once();
+ flProviderService.addOFMessageListener(EasyMock.eq(OFType.BARRIER_REPLY),
+ (FlowPusher) EasyMock.anyObject());
+ EasyMock.expectLastCall().once();
+ }
+
+ private void endInitMock() {
+ EasyMock.replay(flowService);
+ EasyMock.replay(threadPoolService);
+ EasyMock.replay(flProviderService);
+ EasyMock.replay(damper);
+ EasyMock.replay(factory);
+ EasyMock.replay(modContext);
+ EasyMock.replay(context);
+ }
+
+ private void initPusher(int num_thread) {
+ pusher = new FlowPusher(num_thread);
+ pusher.init(context, modContext, factory, damper);
+ pusher.start();
+ }
+
+ private void prepareBarrier(IOFSwitch sw) {
+ OFBarrierRequest req = EasyMock.createMock(OFBarrierRequest.class);
+ req.setXid(EasyMock.anyInt());
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(req.getXid()).andReturn(1).anyTimes();
+ EasyMock.expect(req.getType()).andReturn(OFType.BARRIER_REQUEST).anyTimes();
+ EasyMock.expect(req.getLength()).andReturn((short)100).anyTimes();
+ EasyMock.replay(req);
+ EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.BARRIER_REQUEST))).andReturn(req);
+
+ ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
+ EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
+ (TimeUnit)EasyMock.anyObject())).andReturn(null).once();
+ EasyMock.replay(executor);
+ EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor);
+
+ EasyMock.expect(sw.getNextTransactionId()).andReturn(1);
+ }
+
+}
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizerTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizerTest.java
new file mode 100644
index 0000000..5b1bbdd
--- /dev/null
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizerTest.java
@@ -0,0 +1,313 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import io.netty.util.concurrent.Future;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.onrc.onos.graph.GraphDBOperation;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
+import net.onrc.onos.ofcontroller.flowmanager.FlowDatabaseOperation;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFStatisticsRequest;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.statistics.OFFlowStatisticsReply;
+import org.openflow.protocol.statistics.OFStatistics;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({FlowSynchronizer.class, GraphDBOperation.class, FlowDatabaseOperation.class})
+public class FlowSynchronizerTest {
+ private FlowPusher pusher;
+ private FlowSynchronizer sync;
+ private List<Long> idAdded;
+ private List<Long> idRemoved;
+
+ @Before
+ public void setUp() throws Exception {
+ idAdded = new ArrayList<Long>();
+ idRemoved = new ArrayList<Long>();
+
+ pusher = EasyMock.createMock(FlowPusher.class);
+ pusher.add(EasyMock.anyObject(IOFSwitch.class), EasyMock.anyObject(OFMessage.class));
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
+ if (msg.getType().equals(OFType.FLOW_MOD)) {
+ OFFlowMod fm = (OFFlowMod)msg;
+ if (fm.getCommand() == OFFlowMod.OFPFC_DELETE_STRICT) {
+ idRemoved.add(fm.getCookie());
+ }
+ }
+ return null;
+ }
+ }).anyTimes();
+ pusher.pushFlowEntry(EasyMock.anyObject(IOFSwitch.class), EasyMock.anyObject(FlowEntry.class));
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ FlowEntry flow = (FlowEntry)EasyMock.getCurrentArguments()[1];
+ idAdded.add(flow.flowEntryId().value());
+ return null;
+ }
+ }).anyTimes();
+ EasyMock.replay(pusher);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ /**
+ * Test that synchronization doesn't affect anything in case either DB and
+ * flow table has the same entries.
+ */
+ @Test
+ public void testStable() {
+ // Create mock of flow table : flow 1
+ IOFSwitch sw = createMockSwitch(new long[] {1});
+
+ // Create mock of flow entries : flow 1
+ initMockGraph(new long[] {1});
+
+ // synchronize
+ doSynchronization(sw,100);
+
+ // check if flow is not changed
+ assertEquals(0, idAdded.size());
+ assertEquals(0, idRemoved.size());
+ }
+
+ /**
+ * Test that an flow is added in case DB has an extra FlowEntry.
+ */
+ @Test
+ public void testSingleAdd() {
+ // Create mock of flow table : null
+ IOFSwitch sw = createMockSwitch(new long[] {});
+
+ // Create mock of flow entries : flow 1
+ initMockGraph(new long[] {1});
+
+ // synchronize
+ doSynchronization(sw,100);
+
+ // check if single flow is installed
+ assertEquals(1, idAdded.size());
+ assertTrue(idAdded.contains((long)1));
+ assertEquals(0, idRemoved.size());
+ }
+
+ /**
+ * Test that an flow is deleted in case switch has an extra FlowEntry.
+ */
+ @Test
+ public void testSingleDelete() {
+ // Create mock of flow table : flow 1
+ IOFSwitch sw = createMockSwitch(new long[] {1});
+
+ // Create mock of flow entries : null
+ initMockGraph(new long[] {});
+
+ // synchronize
+ doSynchronization(sw,100);
+
+ // check if single flow is deleted
+ assertEquals(0, idAdded.size());
+ assertEquals(1, idRemoved.size());
+ assertTrue(idRemoved.contains((long)1));
+ }
+
+ /**
+ * Test that appropriate flows are added and other appropriate flows are deleted
+ * in case flows in DB are overlapping flows in switch.
+ */
+ @Test
+ public void testMixed() {
+ // Create mock of flow table : flow 1,2,3
+ IOFSwitch sw = createMockSwitch(new long[] {1,2,3});
+
+ // Create mock of flow entries : flow 2,3,4,5
+ initMockGraph(new long[] {2,3,4,5});
+
+ // synchronize
+ doSynchronization(sw,100);
+
+ // check if two flows {4,5} is installed and one flow {1} is deleted
+ assertEquals(2, idAdded.size());
+ assertTrue(idAdded.contains((long)4));
+ assertTrue(idAdded.contains((long)5));
+ assertEquals(1, idRemoved.size());
+ assertTrue(idRemoved.contains((long)1));
+ }
+
+
+ @Test
+ public void testMassive() {
+ // Create mock of flow table : flow 0-1999
+ long [] swIdList = new long [2000];
+ for (long i = 0; i < 2000; ++i) {
+ swIdList[(int)i] = i;
+ }
+ IOFSwitch sw = createMockSwitch(swIdList);
+
+ // Create mock of flow entries : flow 1500-3499
+ long [] dbIdList = new long [2000];
+ for (long i = 0; i < 2000; ++i) {
+ dbIdList[(int)i] = 1500 + i;
+ }
+ initMockGraph(dbIdList);
+
+ // synchronize
+ doSynchronization(sw, 3000);
+
+ // check if 1500 flows {2000-3499} is installed and 1500 flows {0,...,1499} is deleted
+ assertEquals(1500, idAdded.size());
+ for (long i = 2000; i < 3500; ++i) {
+ assertTrue(idAdded.contains(i));
+ }
+ assertEquals(1500, idRemoved.size());
+ for (long i = 0; i < 1500; ++i) {
+ assertTrue(idRemoved.contains(i));
+ }
+ }
+
+ /**
+ * Create mock IOFSwitch with flow table which has arbitrary flows.
+ * @param cookieList List of FlowEntry IDs switch has.
+ * @return Mock object.
+ */
+ private IOFSwitch createMockSwitch(long[] cookieList) {
+ IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+ EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+
+ List<OFStatistics> stats = new ArrayList<OFStatistics>();
+ for (long cookie : cookieList) {
+ stats.add(createReply(cookie));
+ }
+
+ @SuppressWarnings("unchecked")
+ Future<List<OFStatistics>> future = EasyMock.createMock(Future.class);
+ try {
+ EasyMock.expect(future.get()).andReturn(stats).once();
+ } catch (InterruptedException e1) {
+ fail("Failed in Future#get()");
+ } catch (ExecutionException e1) {
+ fail("Failed in Future#get()");
+ }
+ EasyMock.replay(future);
+
+ try {
+ EasyMock.expect(sw.getStatistics(EasyMock.anyObject(OFStatisticsRequest.class)))
+ .andReturn(future).once();
+ } catch (IOException e) {
+ fail("Failed in IOFSwitch#getStatistics()");
+ }
+
+ EasyMock.replay(sw);
+ return sw;
+ }
+
+ /**
+ * Create single OFFlowStatisticsReply object which is actually obtained from switch.
+ * @param cookie Cookie value, which indicates ID of FlowEntry installed to switch.
+ * @return Created object.
+ */
+ private OFFlowStatisticsReply createReply(long cookie) {
+ OFFlowStatisticsReply stat = new OFFlowStatisticsReply();
+ OFMatch match = new OFMatch();
+
+ stat.setCookie(cookie);
+ stat.setMatch(match);
+ stat.setPriority((short)1);
+
+ return stat;
+ }
+
+ /**
+ * Create mock GraphDBOperation and FlowDatabaseOperation to mock DB.
+ * @param idList List of FlowEntry IDs stored in DB.
+ */
+ private void initMockGraph(long[] idList) {
+ List<IFlowEntry> flowEntryList = new ArrayList<IFlowEntry>();
+
+ for (long id : idList) {
+ IFlowEntry entry = EasyMock.createMock(IFlowEntry.class);
+ EasyMock.expect(entry.getFlowEntryId()).andReturn(String.valueOf(id)).anyTimes();
+ EasyMock.replay(entry);
+ flowEntryList.add(entry);
+ }
+
+ ISwitchObject swObj = EasyMock.createMock(ISwitchObject.class);
+ EasyMock.expect(swObj.getFlowEntries()).andReturn(flowEntryList).once();
+ EasyMock.replay(swObj);
+
+ GraphDBOperation mockOp = PowerMock.createMock(GraphDBOperation.class);
+ EasyMock.expect(mockOp.searchSwitch(EasyMock.anyObject(String.class))).andReturn(swObj).once();
+
+ PowerMock.mockStatic(FlowDatabaseOperation.class);
+ for (IFlowEntry entry : flowEntryList) {
+ EasyMock.expect(FlowDatabaseOperation.extractFlowEntry(EasyMock.eq(entry)))
+ .andAnswer(new IAnswer<FlowEntry>() {
+ @Override
+ public FlowEntry answer() throws Throwable {
+ IFlowEntry iflow = (IFlowEntry)EasyMock.getCurrentArguments()[0];
+ long flowEntryId = Long.valueOf(iflow.getFlowEntryId());
+
+ FlowEntry flow = EasyMock.createMock(FlowEntry.class);
+ EasyMock.expect(flow.flowEntryId()).andReturn(new FlowEntryId(flowEntryId)).anyTimes();
+ EasyMock.replay(flow);
+ return flow;
+ }
+
+ }).anyTimes();
+ EasyMock.expect(mockOp.searchFlowEntry(EasyMock.eq(new FlowEntryId(entry.getFlowEntryId()))))
+ .andReturn(entry);
+ }
+ PowerMock.replay(FlowDatabaseOperation.class);
+ EasyMock.replay(mockOp);
+
+ try {
+ PowerMock.expectNew(GraphDBOperation.class, "").andReturn(mockOp);
+ } catch (Exception e) {
+ fail("Failed to create GraphDBOperation");
+ }
+ PowerMock.replay(GraphDBOperation.class);
+ }
+
+ /**
+ * Instantiate FlowSynchronizer and sync flows.
+ * @param sw Target IOFSwitch object
+ */
+ private void doSynchronization(IOFSwitch sw, long wait) {
+ sync = new FlowSynchronizer();
+ sync.init(pusher);
+ sync.synchronize(sw);
+
+ try {
+ Thread.sleep(wait);
+ } catch (InterruptedException e) {
+ fail("Failed to sleep");
+ }
+ }
+}
diff --git a/web/add_flow.py b/web/add_flow.py
index eed75f9..c621c30 100755
--- a/web/add_flow.py
+++ b/web/add_flow.py
@@ -488,6 +488,8 @@
if __name__ == "__main__":
usage_msg = "Usage: %s [Flags] <flow-id> <installer-id> <src-dpid> <src-port> <dest-dpid> <dest-port> [Flow Path Flags] [Match Conditions] [Actions]\n" % (sys.argv[0])
usage_msg = usage_msg + "\n"
+ usage_msg = usage_msg + " <flow-id> The Flow ID, or -1 if it should be assigned by ONOS\n"
+ usage_msg = usage_msg + "\n"
usage_msg = usage_msg + " Flags:\n"
usage_msg = usage_msg + " -m [monitorname] Monitor and maintain the installed shortest path(s)\n"
usage_msg = usage_msg + " If 'monitorname' is specified and is set to 'ONOS'\n"
diff --git a/web/flowsync.py b/web/flowsync.py
new file mode 100755
index 0000000..51399d5
--- /dev/null
+++ b/web/flowsync.py
@@ -0,0 +1,103 @@
+#! /usr/bin/env python
+
+
+import pprint
+import os
+import sys
+import subprocess
+import json
+import argparse
+import io
+import time
+
+from flask import Flask, json, Response, render_template, make_response, request
+
+## Global Var ##
+ControllerIP="127.0.0.1"
+ControllerPort=8080
+
+DEBUG=0
+pp = pprint.PrettyPrinter(indent=4)
+
+app = Flask(__name__)
+
+## Worker Functions ##
+def log_error(txt):
+ print '%s' % (txt)
+
+def debug(txt):
+ if DEBUG:
+ print '%s' % (txt)
+
+# @app.route("/wm/fprog/synchronizer/sync/<dpid>/json")
+# Sample output:
+# "true"
+def synchronize(dpid):
+ try:
+ command = "curl -s \"http://%s:%s/wm/fprog/synchronizer/sync/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+ debug("synchronize %s" % command)
+
+ result = os.popen(command).read()
+ debug("result %s" % result)
+ if result == "false":
+ print "Failed to synchronize"
+ return;
+ except:
+ log_error("Controller IF has issue")
+ exit(1)
+
+ print "Synchronization of switch %s has successfully began" % (dpid)
+
+# @app.route("/wm/fprog/synchronizer/interrupt/<dpid>/json")
+# Sample output:
+# "true"
+def interrupt(dpid):
+ try:
+ command = "curl -s \"http://%s:%s/wm/fprog/synchronizer/interrupt/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+ debug("interrupt %s" % command)
+
+ result = os.popen(command).read()
+ debug("result %s" % result)
+ if result == "false":
+ print "Failed to interrupt synchronization"
+ return;
+ except:
+ log_error("Controller IF has issue")
+ exit(1)
+
+ print "Synchronization of switch %s has successfully interrupted" % (dpid)
+
+
+if __name__ == "__main__":
+ usage_msg1 = "Usage:\n"
+ usage_msg2 = "%s sync <dpid> : Start synchronization of the switch\n" % (sys.argv[0])
+ usage_msg3 = " interrupt <dpid> : Interrupt synchronization of the switch\n"
+ usage_msg = usage_msg1 + usage_msg2 + usage_msg3;
+
+ app.debug = True;
+
+ # Usage info
+ if len(sys.argv) > 1 and (sys.argv[1] == "-h" or sys.argv[1] == "--help"):
+ print(usage_msg)
+ exit(0)
+
+ # Check arguments
+ if len(sys.argv) < 2:
+ log_error(usage_msg)
+ exit(1)
+
+ # Do the work
+ if sys.argv[1] == "sync":
+ if len(sys.argv) < 3:
+ log_error(usage_msg)
+ exit(1)
+ synchronize(sys.argv[2])
+ elif sys.argv[1] == "interrupt":
+ if len(sys.argv) < 3:
+ log_error(usage_msg)
+ exit(1)
+ interrupt(sys.argv[2])
+ else:
+ log_error(usage_msg)
+ exit(1)
+
\ No newline at end of file
diff --git a/web/get_flow.py b/web/get_flow.py
index c45d853..72fbd4a 100755
--- a/web/get_flow.py
+++ b/web/get_flow.py
@@ -250,48 +250,6 @@
print_flow_path(parsedResult)
-def get_installer_flow_paths(installer_id, v1, p1, v2, p2):
- try:
- command = "curl -s \"http://%s:%s/wm/flow/getall-by-installer-id/%s/%s/%s/%s/%s/json\"" % (ControllerIP, ControllerPort, installer_id, v1, p1, v2, p2)
- debug("get_installer_flow_paths %s" % command)
-
- result = os.popen(command).read()
- debug("result %s" % result)
- if len(result) == 0:
- print "No Flows found"
- return;
-
- parsedResult = json.loads(result)
- debug("parsed %s" % parsedResult)
- except:
- log_error("Controller IF has issue")
- exit(1)
-
- for flowPath in parsedResult:
- print_flow_path(flowPath)
-
-
-def get_endpoints_flow_paths(v1, p1, v2, p2):
- try:
- command = "curl -s \"http://%s:%s/wm/flow/getall-by-endpoints/%s/%s/%s/%s/json\"" % (ControllerIP, ControllerPort, v1, p1, v2, p2)
- debug("get_endpoints_flow_paths %s" % command)
-
- result = os.popen(command).read()
- debug("result %s" % result)
- if len(result) == 0:
- print "No Flows found"
- return;
-
- parsedResult = json.loads(result)
- debug("parsed %s" % parsedResult)
- except:
- log_error("Controller IF has issue")
- exit(1)
-
- for flowPath in parsedResult:
- print_flow_path(flowPath)
-
-
def get_all_flow_paths():
try:
command = "curl -s \"http://%s:%s/wm/flow/getall/json\"" % (ControllerIP, ControllerPort)
@@ -316,9 +274,7 @@
usage_msg1 = "Usage:\n"
usage_msg2 = "%s <flow_id> : Print flow with Flow ID of <flow_id>\n" % (sys.argv[0])
usage_msg3 = " all : Print all flows\n"
- usage_msg4 = " installer <installer-id> <src-dpid> <src-port> <dest-dpid> <dest-port>\n"
- usage_msg5 = " endpoints <src-dpid> <src-port> <dest-dpid> <dest-port>"
- usage_msg = usage_msg1 + usage_msg2 + usage_msg3 + usage_msg4 + usage_msg5;
+ usage_msg = usage_msg1 + usage_msg2 + usage_msg3;
# app.debug = False;
@@ -335,17 +291,5 @@
# Do the work
if sys.argv[1] == "all":
get_all_flow_paths()
- elif sys.argv[1] == "installer":
- if len(sys.argv) < 7:
- log_error(usage_msg)
- exit(1)
- get_installer_flow_paths(sys.argv[2], sys.argv[3], sys.argv[4],
- sys.argv[5], sys.argv[6])
- elif sys.argv[1] == "endpoints":
- if len(sys.argv) < 6:
- log_error(usage_msg)
- exit(1)
- get_endpoints_flow_paths(sys.argv[2], sys.argv[3], sys.argv[4],
- sys.argv[5])
else:
get_flow_path(sys.argv[1])
diff --git a/web/pusher.py b/web/pusher.py
index f53e3ea..2a3528b 100755
--- a/web/pusher.py
+++ b/web/pusher.py
@@ -29,14 +29,12 @@
if DEBUG:
print '%s' % (txt)
-# @app.route("/wm/fprog/setrate/<dpid>/<rate>/json")
+# @app.route("/wm/fprog/pusher/setrate/<dpid>/<rate>/json")
# Sample output:
# "true"
-
-
def set_rate(dpid,rate):
try:
- command = "curl -s \"http://%s:%s/wm/fprog/setrate/%s/%s/json\"" % (ControllerIP, ControllerPort, dpid, rate)
+ command = "curl -s \"http://%s:%s/wm/fprog/pusher/setrate/%s/%s/json\"" % (ControllerIP, ControllerPort, dpid, rate)
debug("set_rate %s" % command)
result = os.popen(command).read()
@@ -50,9 +48,12 @@
print "Sending rate to %s is successfully set to %s" % (dpid, rate)
+# @app.route("/wm/fprog/pusher/suspend/<dpid>/json")
+# Sample output:
+# "true"
def suspend(dpid):
try:
- command = "curl -s \"http://%s:%s/wm/fprog/suspend/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+ command = "curl -s \"http://%s:%s/wm/fprog/pusher/suspend/%s/json\"" % (ControllerIP, ControllerPort, dpid)
debug("suspend %s" % command)
result = os.popen(command).read()
@@ -66,9 +67,12 @@
print "DPID %s is successfully suspended" % dpid
+# @app.route("/wm/fprog/pusher/resume/<dpid>/json")
+# Sample output:
+# "true"
def resume(dpid):
try:
- command = "curl -s \"http://%s:%s/wm/fprog/resume/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+ command = "curl -s \"http://%s:%s/wm/fprog/pusher/resume/%s/json\"" % (ControllerIP, ControllerPort, dpid)
debug("resume %s" % command)
result = os.popen(command).read()
@@ -82,9 +86,12 @@
print "DPID %s is successfully resumed" % dpid
+# @app.route("/wm/fprog/pusher/barrier/<dpid>/json")
+# Sample output:
+# "{"version":1,"type":"BARRIER_REPLY","length":8,"xid":4,"lengthU":8}"
def barrier(dpid):
try:
- command = "curl -s \"http://%s:%s/wm/fprog/barrier/%s/json\"" % (ControllerIP, ControllerPort, dpid)
+ command = "curl -s \"http://%s:%s/wm/fprog/pusher/barrier/%s/json\"" % (ControllerIP, ControllerPort, dpid)
debug("barrier %s" % command)
result = os.popen(command).read()