Merge remote-tracking branch 'upstream/master' into syncdev
Conflicts:
src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
diff --git a/cluster-mgmt/bin/cho-link-failure.sh b/cluster-mgmt/bin/cho-link-failure.sh
index 6c5f128..4db887a 100755
--- a/cluster-mgmt/bin/cho-link-failure.sh
+++ b/cluster-mgmt/bin/cho-link-failure.sh
@@ -16,7 +16,6 @@
dsh -w ${basename}1 "cd ONOS/scripts; ./all-linkup.sh"
echo "clean up flow"
dsh -w ${basename}1 "cd ONOS/web; ./delete_flow.py 1 100"
-dsh -w ${basename}1 "cd ONOS/web; ./clear_flow.py 1 100"
sleep 1
dsh -w ${basename}1 "cd ONOS/web; ./get_flow.py all"
dsh "cd ONOS/scripts; ./delflow.sh"
diff --git a/cluster-mgmt/bin/demo-reset-hw.sh b/cluster-mgmt/bin/demo-reset-hw.sh
index 8c586f5..15f97e1 100755
--- a/cluster-mgmt/bin/demo-reset-hw.sh
+++ b/cluster-mgmt/bin/demo-reset-hw.sh
@@ -9,7 +9,6 @@
echo "cleanup excess flows"
$DIR/web/delete_flow.py 201 300
-$DIR/web/clear_flow.py 201 300
echo "cleanup excess flows done"
echo "Adding 200 flows"
$DIR/web/add_flow.py -m onos -f $DIR/web/flowdef_demo_start.txt
diff --git a/cluster-mgmt/bin/test-link-failure.sh b/cluster-mgmt/bin/test-link-failure.sh
index 6c5f128..4db887a 100755
--- a/cluster-mgmt/bin/test-link-failure.sh
+++ b/cluster-mgmt/bin/test-link-failure.sh
@@ -16,7 +16,6 @@
dsh -w ${basename}1 "cd ONOS/scripts; ./all-linkup.sh"
echo "clean up flow"
dsh -w ${basename}1 "cd ONOS/web; ./delete_flow.py 1 100"
-dsh -w ${basename}1 "cd ONOS/web; ./clear_flow.py 1 100"
sleep 1
dsh -w ${basename}1 "cd ONOS/web; ./get_flow.py all"
dsh "cd ONOS/scripts; ./delflow.sh"
diff --git a/scripts/demo-reset-sw.sh b/scripts/demo-reset-sw.sh
index 65a2ff1..e6cabae 100755
--- a/scripts/demo-reset-sw.sh
+++ b/scripts/demo-reset-sw.sh
@@ -6,7 +6,6 @@
$DIR/scripts/all-linkup.sh
echo "Delete Flows"
$DIR/web/delete_flow.py 201 300
-$DIR/web/clear_flow.py 201 300
echo "Adding Flows"
$DIR/web/add_flow.py -m onos -f $DIR/web/flowdef_demo_start.txt
ssh -i ~/.ssh/onlabkey.pem ${basename}5 'ONOS/start-onos.sh stop'
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
index e9e2bd1..fd9d535 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
@@ -121,19 +121,24 @@
public void removeDevice(IDeviceObject deviceObject) {
String deviceMac = deviceObject.getMACAddress();
- for (IIpv4Address ipv4AddressVertex : deviceObject.getIpv4Addresses()) {
- ope.removeIpv4Address(ipv4AddressVertex);
- }
+ removeDeviceImpl(deviceObject);
try {
- ope.removeDevice(deviceObject);
ope.commit();
- log.error("DeviceStorage:removeDevice mac:{} done", deviceMac);
+ log.debug("DeviceStorage:removeDevice mac:{} done", deviceMac);
} catch (TitanException e) {
ope.rollback();
log.error("DeviceStorage:removeDevice mac:{} failed", deviceMac);
}
}
+
+ public void removeDeviceImpl(IDeviceObject deviceObject) {
+ for (IIpv4Address ipv4AddressVertex : deviceObject.getIpv4Addresses()) {
+ ope.removeIpv4Address(ipv4AddressVertex);
+ }
+
+ ope.removeDevice(deviceObject);
+ }
/***
* This function is for getting the Device from the DB by Mac address of the device.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
index 59f59b7..dcfdc73 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
@@ -147,28 +147,28 @@
}
for (OFPhysicalPort port: sw.getPorts()) {
- addPort(dpid, port);
- /*IPortObject p = op.searchPort(dpid, port.getPortNumber());
- if (p != null) {
- log.debug("SwitchStorage:addPort dpid:{} port:{} exists", dpid, port.getPortNumber());
- setPortStateImpl(p, port.getState(), port.getName());
- p.setState("ACTIVE");
- if (curr.getPort(port.getPortNumber()) == null) {
- // The port exists but the switch has no "on" link to it
- curr.addPort(p);
- }
- } else {
- p = addPortImpl(curr, port.getPortNumber());
- setPortStateImpl(p, port.getState(), port.getName());
- } */
+ //addPort(dpid, port);
+ addPortImpl(curr, port);
+
}
+
+ // XXX for now delete devices when we change a port to prevent
+ // having stale devices.
+ DeviceStorageImpl deviceStorage = new DeviceStorageImpl();
+ deviceStorage.init("");
+ for (IPortObject portObject : curr.getPorts()) {
+ for (IDeviceObject deviceObject : portObject.getDevices()) {
+ // The deviceStorage has to remove on the object gained by its own
+ // FramedGraph, it can't use our objects from here
+ deviceStorage.removeDeviceImpl(deviceStorage.getDeviceByMac(deviceObject.getMACAddress()));
+ }
+ }
+
op.commit();
success = true;
} catch (Exception e) {
op.rollback();
- //e.printStackTrace();
- log.error("SwitchStorage:addSwitch dpid:{} failed: {}", dpid);
- log.error("switch write error", e);
+ log.error("SwitchStorage:addSwitch dpid:{} failed", dpid, e);
}
return success;
@@ -292,9 +292,6 @@
public boolean addPort(String dpid, OFPhysicalPort phport) {
boolean success = false;
- DeviceStorageImpl deviceStorage = new DeviceStorageImpl();
- deviceStorage.init("");
-
if(((OFPortConfig.OFPPC_PORT_DOWN.getValue() & phport.getConfig()) > 0) ||
((OFPortState.OFPPS_LINK_DOWN.getValue() & phport.getState()) > 0)) {
// just dispatch to deletePort()
@@ -307,27 +304,17 @@
ISwitchObject sw = op.searchSwitch(dpid);
if (sw != null) {
- IPortObject p = sw.getPort(phport.getPortNumber());
- log.info("SwitchStorage:addPort dpid:{} port:{}", dpid, phport.getPortNumber());
- if (p != null) {
- setPortStateImpl(p, phport.getState(), phport.getName());
-
- if (sw.getPort(phport.getPortNumber()) == null) {
- // The port exists but the switch has no "on" link to it
- sw.addPort(p);
- }
-
- // XXX for now delete devices when we change a port to prevent
- // having stale devices.
- for (IDeviceObject deviceObject : p.getDevices()) {
- deviceStorage.removeDevice(deviceObject);
- }
-
- log.error("SwitchStorage:addPort dpid:{} port:{} exists setting as ACTIVE", dpid, phport.getPortNumber());
- } else {
- addPortImpl(sw, phport.getPortNumber());
- setPortStateImpl(p, phport.getState(), phport.getName());
- }
+ IPortObject portObject = addPortImpl(sw, phport);
+
+ // XXX for now delete devices when we change a port to prevent
+ // having stale devices.
+ DeviceStorageImpl deviceStorage = new DeviceStorageImpl();
+ deviceStorage.init("");
+
+ for (IDeviceObject deviceObject : portObject.getDevices()) {
+ deviceStorage.removeDevice(deviceObject);
+ }
+
op.commit();
success = true;
} else {
@@ -337,8 +324,8 @@
op.rollback();
e.printStackTrace();
log.error("SwitchStorage:addPort dpid:{} port:{} failed", dpid, phport.getPortNumber());
- }
-
+ }
+
return success;
}
@@ -430,12 +417,59 @@
}
}
+
+ private IPortObject addPortImpl(ISwitchObject sw, OFPhysicalPort phport) {
+ IPortObject portObject = op.searchPort(sw.getDPID(), phport.getPortNumber());
+
+ log.info("SwitchStorage:addPort dpid:{} port:{}",
+ sw.getDPID(), phport.getPortNumber());
+
+ if (portObject != null) {
+ setPortStateImpl(portObject, phport.getState(), phport.getName());
+ portObject.setState("ACTIVE");
+
+ // This a convoluted way of checking if the port is attached
+ // or not, but doing it this way avoids using the
+ // ISwitchObject.getPort method which uses GremlinGroovy query
+ // and takes forever.
+ boolean attached = false;
+ for (IPortObject portsOnSwitch : sw.getPorts()) {
+ if (portsOnSwitch.getPortId() == portObject.getPortId()) {
+ attached = true;
+ break;
+ }
+ }
+
+ if (!attached) {
+ sw.addPort(portObject);
+ }
+
+ /*
+ if (sw.getPort(phport.getPortNumber()) == null) {
+ // The port exists but the switch has no "on" link to it
+ sw.addPort(portObject);
+ }*/
+
+ log.info("SwitchStorage:addPort dpid:{} port:{} exists setting as ACTIVE",
+ sw.getDPID(), phport.getPortNumber());
+ } else {
+ //addPortImpl(sw, phport.getPortNumber());
+ portObject = op.newPort(sw.getDPID(), phport.getPortNumber());
+ portObject.setState("ACTIVE");
+ setPortStateImpl(portObject, phport.getState(), phport.getName());
+ sw.addPort(portObject);
+ log.info("SwitchStorage:addPort dpid:{} port:{} done",
+ sw.getDPID(), phport.getPortNumber());
+ }
+
+ return portObject;
+ }
// TODO There's an issue here where a port with that ID could already
// exist when we try to add this one (because it's left over from an
// old topology). We need to remove an old port with the same ID when
// we add the new port. Also it seems that old ports like this are
// never cleaned up and will remain in the DB in the ACTIVE state forever.
- private IPortObject addPortImpl(ISwitchObject sw, short portNum) {
+ /*private IPortObject addPortImpl(ISwitchObject sw, short portNum) {
IPortObject p = op.newPort(sw.getDPID(), portNum);
p.setState("ACTIVE");
sw.addPort(p);
@@ -443,7 +477,7 @@
sw.getDPID(), portNum);
return p;
- }
+ }*/
private void setPortStateImpl(IPortObject port, Integer state, String desc) {
if (port != null) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index f623541..a1216f4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -189,8 +189,7 @@
//
// Assign the FlowEntry ID.
//
- if ((flowEntry.flowEntryId() == null) ||
- (flowEntry.flowEntryId().value() == 0)) {
+ if (! flowEntry.isValidFlowEntryId()) {
long id = flowManager.getNextFlowEntryId();
flowEntry.setFlowEntryId(new FlowEntryId(id));
}
@@ -370,122 +369,6 @@
* @return true on success, otherwise false.
*/
static boolean deleteAllFlows(GraphDBOperation dbHandler) {
- final ConcurrentLinkedQueue<FlowId> concurrentAllFlowIds =
- new ConcurrentLinkedQueue<FlowId>();
-
- // Get all Flow IDs
- Iterable<IFlowPath> allFlowPaths = dbHandler.getAllFlowPaths();
- for (IFlowPath flowPathObj : allFlowPaths) {
- if (flowPathObj == null)
- continue;
- String flowIdStr = flowPathObj.getFlowId();
- if (flowIdStr == null)
- continue;
- FlowId flowId = new FlowId(flowIdStr);
- concurrentAllFlowIds.add(flowId);
- }
-
- // Delete all flows one-by-one
- for (FlowId flowId : concurrentAllFlowIds)
- deleteFlow(dbHandler, flowId);
-
- /*
- * TODO: A faster mechanism to delete the Flow Paths by using
- * a number of threads. Commented-out for now.
- */
- /*
- //
- // Create the threads to delete the Flow Paths
- //
- List<Thread> threads = new LinkedList<Thread>();
- for (int i = 0; i < 10; i++) {
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- FlowId flowId = concurrentAllFlowIds.poll();
- if (flowId == null)
- return;
- deleteFlow(dbHandler, flowId);
- }
- }}, "Delete All Flow Paths");
- threads.add(thread);
- }
-
- // Start processing
- for (Thread thread : threads) {
- thread.start();
- }
-
- // Wait for all threads to complete
- for (Thread thread : threads) {
- try {
- thread.join();
- } catch (InterruptedException e) {
- log.debug("Exception waiting for a thread to delete a Flow Path: ", e);
- }
- }
- */
-
- return true;
- }
-
- /**
- * Delete a previously added flow.
- *
- * @param dbHandler the Graph Database handler to use.
- * @param flowId the Flow ID of the flow to delete.
- * @return true on success, otherwise false.
- */
- static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
- IFlowPath flowObj = null;
- //
- // We just mark the entries for deletion,
- // and let the switches remove each individual entry after
- // it has been removed from the switches.
- //
- try {
- flowObj = dbHandler.searchFlowPath(flowId);
- } catch (Exception e) {
- // TODO: handle exceptions
- dbHandler.rollback();
- log.error(":deleteFlow FlowId:{} failed", flowId.toString());
- return false;
- }
- if (flowObj == null) {
- dbHandler.commit();
- return true; // OK: No such flow
- }
-
- //
- // Find and mark for deletion all Flow Entries,
- // and the Flow itself.
- //
- flowObj.setFlowPathUserState("FP_USER_DELETE");
- Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
- boolean empty = true; // TODO: an ugly hack
- for (IFlowEntry flowEntryObj : flowEntries) {
- empty = false;
- // flowObj.removeFlowEntry(flowEntryObj);
- // conn.utils().removeFlowEntry(conn, flowEntryObj);
- flowEntryObj.setUserState("FE_USER_DELETE");
- flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- }
- // Remove from the database empty flows
- if (empty)
- dbHandler.removeFlowPath(flowObj);
- dbHandler.commit();
-
- return true;
- }
-
- /**
- * Clear the state for all previously added flows.
- *
- * @param dbHandler the Graph Database handler to use.
- * @return true on success, otherwise false.
- */
- static boolean clearAllFlows(GraphDBOperation dbHandler) {
List<FlowId> allFlowIds = new LinkedList<FlowId>();
// Get all Flow IDs
@@ -500,29 +383,29 @@
allFlowIds.add(flowId);
}
- // Clear all flows one-by-one
+ // Delete all flows one-by-one
for (FlowId flowId : allFlowIds) {
- clearFlow(dbHandler, flowId);
+ deleteFlow(dbHandler, flowId);
}
return true;
}
/**
- * Clear the state for a previously added flow.
+ * Delete a previously added flow.
*
* @param dbHandler the Graph Database handler to use.
- * @param flowId the Flow ID of the flow to clear.
+ * @param flowId the Flow ID of the flow to delete.
* @return true on success, otherwise false.
*/
- static boolean clearFlow(GraphDBOperation dbHandler, FlowId flowId) {
+ static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
IFlowPath flowObj = null;
try {
flowObj = dbHandler.searchFlowPath(flowId);
} catch (Exception e) {
// TODO: handle exceptions
dbHandler.rollback();
- log.error(":clearFlow FlowId:{} failed", flowId.toString());
+ log.error(":deleteFlow FlowId:{} failed", flowId.toString());
return false;
}
if (flowObj == null) {
@@ -706,9 +589,9 @@
* @param maxFlows the maximum number of flows to be returned.
* @return the Flow Paths if found, otherwise null.
*/
- static ArrayList<IFlowPath> getAllFlowsSummary(GraphDBOperation dbHandler,
- FlowId flowId,
- int maxFlows) {
+ static ArrayList<FlowPath> getAllFlowsSummary(GraphDBOperation dbHandler,
+ FlowId flowId,
+ int maxFlows) {
//
// TODO: The implementation below is not optimal:
// We fetch all flows, and then return only the subset that match
@@ -716,61 +599,32 @@
// We should use the appropriate Titan/Gremlin query to filter-out
// the flows as appropriate.
//
- ArrayList<IFlowPath> flowPathsWithoutFlowEntries =
- getAllFlowsWithoutFlowEntries(dbHandler);
-
- Collections.sort(flowPathsWithoutFlowEntries,
- new Comparator<IFlowPath>() {
- @Override
- public int compare(IFlowPath first, IFlowPath second) {
- long result =
- new FlowId(first.getFlowId()).value()
- - new FlowId(second.getFlowId()).value();
- if (result > 0) {
- return 1;
- } else if (result < 0) {
- return -1;
- } else {
- return 0;
- }
- }
- }
- );
-
- return flowPathsWithoutFlowEntries;
+ ArrayList<FlowPath> flowPaths = getAllFlowsWithDataPathSummary(dbHandler);
+ Collections.sort(flowPaths);
+ return flowPaths;
}
/**
- * Get all Flows information, without the associated Flow Entries.
+ * Get all Flows information, with Data Path summary for the Flow Entries.
*
* @param dbHandler the Graph Database handler to use.
- * @return all Flows information, without the associated Flow Entries.
+ * @return all Flows information, with Data Path summary for the Flow
+ * Entries.
*/
- static ArrayList<IFlowPath> getAllFlowsWithoutFlowEntries(GraphDBOperation dbHandler) {
- Iterable<IFlowPath> flowPathsObj = null;
- ArrayList<IFlowPath> flowPathsObjArray = new ArrayList<IFlowPath>();
+ static ArrayList<FlowPath> getAllFlowsWithDataPathSummary(GraphDBOperation dbHandler) {
+ ArrayList<FlowPath> flowPaths = getAllFlows(dbHandler);
- // TODO: Remove this op.commit() flow, because it is not needed?
- dbHandler.commit();
+ // Truncate each Flow Path and Flow Entry
+ for (FlowPath flowPath : flowPaths) {
+ flowPath.setFlowEntryMatch(null);
+ flowPath.setFlowEntryActions(null);
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntryMatch(null);
+ flowEntry.setFlowEntryActions(null);
+ }
+ }
- try {
- flowPathsObj = dbHandler.getAllFlowPaths();
- } catch (Exception e) {
- // TODO: handle exceptions
- dbHandler.rollback();
- log.error(":getAllFlowPaths failed");
- return flowPathsObjArray; // No Flows found
- }
- if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
- return flowPathsObjArray; // No Flows found
- }
-
- for (IFlowPath flowObj : flowPathsObj)
- flowPathsObjArray.add(flowObj);
-
- // conn.endTx(Transaction.COMMIT);
-
- return flowPathsObjArray;
+ return flowPaths;
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 0927e49..f427beb 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -7,7 +7,8 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.Executors;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -27,7 +28,6 @@
import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
import net.onrc.onos.ofcontroller.flowmanager.web.FlowWebRoutable;
import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
-import net.onrc.onos.ofcontroller.topology.ITopologyNetService;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.util.*;
@@ -39,13 +39,6 @@
* Flow Manager class for handling the network flows.
*/
public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
-
- //
- // TODO: A temporary variable to switch between the poll-based and
- // notification mechanism for the Flow Manager.
- //
- private final static boolean enableNotifications = false;
-
// flag to use FlowPusher instead of FlowSwitchOperation/MessageDamper
private final static boolean enableFlowPusher = false;
@@ -53,7 +46,6 @@
protected GraphDBOperation dbHandlerInner;
protected volatile IFloodlightProviderService floodlightProvider;
- protected volatile ITopologyNetService topologyNetService;
protected volatile IDatagridService datagridService;
protected IRestApiService restApi;
protected FloodlightModuleContext context;
@@ -74,309 +66,14 @@
private static Random randomGenerator = new Random();
private static int nextFlowEntryIdPrefix = 0;
private static int nextFlowEntryIdSuffix = 0;
- private static long nextFlowEntryId = 0;
/** The logger. */
private final static Logger log = LoggerFactory.getLogger(FlowManager.class);
- // The periodic task(s)
- private ScheduledExecutorService mapReaderScheduler;
- private ScheduledExecutorService shortestPathReconcileScheduler;
-
- /**
- * Periodic task for reading the Flow Entries and pushing changes
- * into the switches.
- */
- final Runnable mapReader = new Runnable() {
- public void run() {
- try {
- runImpl();
- } catch (Exception e) {
- log.debug("Exception processing All Flow Entries from the Network MAP: ", e);
- dbHandlerInner.rollback();
- return;
- }
- }
-
- private void runImpl() {
- long startTime = System.nanoTime();
- int counterAllFlowEntries = 0;
- int counterMyNotUpdatedFlowEntries = 0;
-
- if (floodlightProvider == null) {
- log.debug("FloodlightProvider service not found!");
- return;
- }
- Map<Long, IOFSwitch> mySwitches =
- floodlightProvider.getSwitches();
- if (mySwitches.isEmpty()) {
- log.trace("No switches controlled");
- return;
- }
- LinkedList<IFlowEntry> addFlowEntries =
- new LinkedList<IFlowEntry>();
- LinkedList<IFlowEntry> deleteFlowEntries =
- new LinkedList<IFlowEntry>();
-
- //
- // Fetch all Flow Entries which need to be updated and select
- // only my Flow Entries that need to be updated into the
- // switches.
- //
- Iterable<IFlowEntry> allFlowEntries =
- dbHandlerInner.getAllSwitchNotUpdatedFlowEntries();
- for (IFlowEntry flowEntryObj : allFlowEntries) {
- log.debug("flowEntryobj : {}", flowEntryObj);
-
- counterAllFlowEntries++;
-
- String dpidStr = flowEntryObj.getSwitchDpid();
- if (dpidStr == null)
- continue;
- Dpid dpid = new Dpid(dpidStr);
- IOFSwitch mySwitch = mySwitches.get(dpid.value());
- if (mySwitch == null)
- continue; // Ignore the entry: not my switch
-
- IFlowPath flowObj =
- dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null)
- continue; // Should NOT happen
- if (flowObj.getFlowId() == null)
- continue; // Invalid entry
-
- //
- // NOTE: For now we process the DELETE before the ADD
- // to cover the more common scenario.
- // TODO: This is error prone and needs to be fixed!
- //
- String userState = flowEntryObj.getUserState();
- if (userState == null)
- continue;
- if (userState.equals("FE_USER_DELETE")) {
- // An entry that needs to be deleted.
- deleteFlowEntries.add(flowEntryObj);
- installFlowEntry(mySwitch, flowObj, flowEntryObj);
- } else {
- addFlowEntries.add(flowEntryObj);
- }
- counterMyNotUpdatedFlowEntries++;
- }
-
- log.debug("addFlowEntries : {}", addFlowEntries);
-
- //
- // Process the Flow Entries that need to be added
- //
- for (IFlowEntry flowEntryObj : addFlowEntries) {
- IFlowPath flowObj =
- dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null)
- continue; // Should NOT happen
- if (flowObj.getFlowId() == null)
- continue; // Invalid entry
-
- Dpid dpid = new Dpid(flowEntryObj.getSwitchDpid());
- IOFSwitch mySwitch = mySwitches.get(dpid.value());
- if (mySwitch == null)
- continue; // Shouldn't happen
- installFlowEntry(mySwitch, flowObj, flowEntryObj);
- }
-
- //
- // Delete all Flow Entries marked for deletion from the
- // Network MAP.
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and delete the Flow Entries after the
- // Barrier message is received.
- //
- while (! deleteFlowEntries.isEmpty()) {
- IFlowEntry flowEntryObj = deleteFlowEntries.poll();
- IFlowPath flowObj =
- dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null) {
- log.debug("Did not find FlowPath to be deleted");
- continue;
- }
- flowObj.removeFlowEntry(flowEntryObj);
- dbHandlerInner.removeFlowEntry(flowEntryObj);
- }
-
- dbHandlerInner.commit();
-
- long estimatedTime = System.nanoTime() - startTime;
- double rate = 0.0;
- if (estimatedTime > 0)
- rate = ((double)counterAllFlowEntries * 1000000000) / estimatedTime;
- String logMsg = "MEASUREMENT: Processed AllFlowEntries: " +
- counterAllFlowEntries + " MyNotUpdatedFlowEntries: " +
- counterMyNotUpdatedFlowEntries + " in " +
- (double)estimatedTime / 1000000000 + " sec: " +
- rate + " paths/s";
- log.debug(logMsg);
- }
- };
-
- /**
- * Periodic task for reading the Flow Paths and recomputing the
- * shortest paths.
- */
- final Runnable shortestPathReconcile = new Runnable() {
- public void run() {
- try {
- runImpl();
- } catch (Exception e) {
- log.debug("Exception processing All Flows from the Network MAP: ", e);
- dbHandlerInner.rollback();
- return;
- }
- }
-
- private void runImpl() {
- long startTime = System.nanoTime();
- int counterAllFlowPaths = 0;
- int counterMyFlowPaths = 0;
-
- if (floodlightProvider == null) {
- log.debug("FloodlightProvider service not found!");
- return;
- }
- Map<Long, IOFSwitch> mySwitches =
- floodlightProvider.getSwitches();
- if (mySwitches.isEmpty()) {
- log.trace("No switches controlled");
- return;
- }
- LinkedList<IFlowPath> deleteFlows = new LinkedList<IFlowPath>();
-
- //
- // Fetch and recompute the Shortest Path for those
- // Flow Paths this controller is responsible for.
- //
- Topology topology = topologyNetService.newDatabaseTopology();
- Iterable<IFlowPath> allFlowPaths = dbHandlerInner.getAllFlowPaths();
- for (IFlowPath flowPathObj : allFlowPaths) {
- counterAllFlowPaths++;
- if (flowPathObj == null)
- continue;
-
- String srcDpidStr = flowPathObj.getSrcSwitch();
- if (srcDpidStr == null)
- continue;
- Dpid srcDpid = new Dpid(srcDpidStr);
- //
- // Use the source DPID as a heuristic to decide
- // which controller is responsible for maintaining the
- // shortest path.
- // NOTE: This heuristic is error-prone: if the switch
- // goes away and no controller is responsible for that
- // switch, then the original Flow Path is not cleaned-up
- //
- IOFSwitch mySwitch = mySwitches.get(srcDpid.value());
- if (mySwitch == null)
- continue; // Ignore: not my responsibility
-
- // Test whether we need to maintain this flow
- String flowPathTypeStr = flowPathObj.getFlowPathType();
- if (flowPathTypeStr == null)
- continue; // Could be invalid entry?
- if (! flowPathTypeStr.equals("FP_TYPE_SHORTEST_PATH"))
- continue; // No need to maintain this flow
-
- //
- // Test whether we need to complete the Flow cleanup,
- // if the Flow has been deleted by the user.
- //
- String flowPathUserStateStr = flowPathObj.getFlowPathUserState();
- if ((flowPathUserStateStr != null)
- && flowPathUserStateStr.equals("FP_USER_DELETE")) {
- Iterable<IFlowEntry> flowEntries = flowPathObj.getFlowEntries();
- final boolean empty = !flowEntries.iterator().hasNext();
- if (empty)
- deleteFlows.add(flowPathObj);
- }
-
- // Fetch the fields needed to recompute the shortest path
- String dataPathSummaryStr = flowPathObj.getDataPathSummary();
- Short srcPortShort = flowPathObj.getSrcPort();
- String dstDpidStr = flowPathObj.getDstSwitch();
- Short dstPortShort = flowPathObj.getDstPort();
- Long flowPathFlagsLong = flowPathObj.getFlowPathFlags();
- if ((dataPathSummaryStr == null) ||
- (srcPortShort == null) ||
- (dstDpidStr == null) ||
- (dstPortShort == null) ||
- (flowPathFlagsLong == null)) {
- continue;
- }
-
- Port srcPort = new Port(srcPortShort);
- Dpid dstDpid = new Dpid(dstDpidStr);
- Port dstPort = new Port(dstPortShort);
- SwitchPort srcSwitchPort = new SwitchPort(srcDpid, srcPort);
- SwitchPort dstSwitchPort = new SwitchPort(dstDpid, dstPort);
- FlowPathType flowPathType = FlowPathType.valueOf(flowPathTypeStr);
- FlowPathUserState flowPathUserState = FlowPathUserState.valueOf(flowPathUserStateStr);
- FlowPathFlags flowPathFlags = new FlowPathFlags(flowPathFlagsLong);
-
- counterMyFlowPaths++;
-
- //
- // NOTE: Using here the regular getDatabaseShortestPath()
- // method won't work here, because that method calls
- // internally "conn.endTx(Transaction.COMMIT)", and that
- // will invalidate all handlers to the Titan database.
- // If we want to experiment with calling here
- // getDatabaseShortestPath(), we need to refactor that code
- // to avoid closing the transaction.
- //
- DataPath dataPath =
- topologyNetService.getTopologyShortestPath(
- topology,
- srcSwitchPort,
- dstSwitchPort);
- if (dataPath == null) {
- // We need the DataPath to compare the paths
- dataPath = new DataPath();
- dataPath.setSrcPort(srcSwitchPort);
- dataPath.setDstPort(dstSwitchPort);
- }
- dataPath.applyFlowPathFlags(flowPathFlags);
-
- String newDataPathSummaryStr = dataPath.dataPathSummary();
- if (dataPathSummaryStr.equals(newDataPathSummaryStr))
- continue; // Nothing changed
-
- reconcileFlow(flowPathObj, dataPath);
- }
-
- //
- // Delete all leftover Flows marked for deletion from the
- // Network MAP.
- //
- while (! deleteFlows.isEmpty()) {
- IFlowPath flowPathObj = deleteFlows.poll();
- dbHandlerInner.removeFlowPath(flowPathObj);
- }
-
- topologyNetService.dropTopology(topology);
-
- dbHandlerInner.commit();
-
- long estimatedTime = System.nanoTime() - startTime;
- double rate = 0.0;
- if (estimatedTime > 0)
- rate = ((double)counterAllFlowPaths * 1000000000) / estimatedTime;
- String logMsg = "MEASUREMENT: Processed AllFlowPaths: " +
- counterAllFlowPaths + " MyFlowPaths: " +
- counterMyFlowPaths + " in " +
- (double)estimatedTime / 1000000000 + " sec: " +
- rate + " paths/s";
- log.debug(logMsg);
- }
- };
-
+ // The queue to write Flow Entries to the database
+ private BlockingQueue<FlowPathEntryPair> flowEntriesToDatabaseQueue =
+ new LinkedBlockingQueue<FlowPathEntryPair>();
+ FlowDatabaseWriter flowDatabaseWriter;
/**
* Initialize the Flow Manager.
@@ -462,22 +159,18 @@
throws FloodlightModuleException {
this.context = context;
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
- topologyNetService = context.getServiceImpl(ITopologyNetService.class);
datagridService = context.getServiceImpl(IDatagridService.class);
restApi = context.getServiceImpl(IRestApiService.class);
if (enableFlowPusher) {
- pusher = context.getServiceImpl(IFlowPusherService.class);
+ pusher = context.getServiceImpl(IFlowPusherService.class);
} else {
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
EnumSet.of(OFType.FLOW_MOD),
OFMESSAGE_DAMPER_TIMEOUT);
}
this.init("");
-
- mapReaderScheduler = Executors.newScheduledThreadPool(1);
- shortestPathReconcileScheduler = Executors.newScheduledThreadPool(1);
}
/**
@@ -515,22 +208,23 @@
// Initialize the Flow Entry ID generator
nextFlowEntryIdPrefix = randomGenerator.nextInt();
-
+
//
- // Create the Flow Event Handler thread and register it with the
- // Datagrid Service
+ // The thread to write to the database
+ //
+ flowDatabaseWriter = new FlowDatabaseWriter(this,
+ flowEntriesToDatabaseQueue);
+ flowDatabaseWriter.start();
+
+ //
+ // The Flow Event Handler thread:
+ // - create
+ // - register with the Datagrid Service
+ // - startup
//
flowEventHandler = new FlowEventHandler(this, datagridService);
datagridService.registerFlowEventHandlerService(flowEventHandler);
-
- // Schedule the threads and periodic tasks
flowEventHandler.start();
- if (! enableNotifications) {
- mapReaderScheduler.scheduleAtFixedRate(
- mapReader, 3, 3, TimeUnit.SECONDS);
- shortestPathReconcileScheduler.scheduleAtFixedRate(
- shortestPathReconcile, 3, 3, TimeUnit.SECONDS);
- }
}
/**
@@ -616,35 +310,6 @@
}
/**
- * Clear the state for all previously added flows.
- *
- * @return true on success, otherwise false.
- */
- @Override
- public boolean clearAllFlows() {
- if (FlowDatabaseOperation.clearAllFlows(dbHandlerApi)) {
- datagridService.notificationSendAllFlowsRemoved();
- return true;
- }
- return false;
- }
-
- /**
- * Clear the state for a previously added flow.
- *
- * @param flowId the Flow ID of the flow to clear.
- * @return true on success, otherwise false.
- */
- @Override
- public boolean clearFlow(FlowId flowId) {
- if (FlowDatabaseOperation.clearFlow(dbHandlerApi, flowId)) {
- datagridService.notificationSendFlowRemoved(flowId);
- return true;
- }
- return false;
- }
-
- /**
* Get a previously added flow.
*
* @param flowId the Flow ID of the flow to get.
@@ -700,22 +365,13 @@
* @return the Flow Paths if found, otherwise null.
*/
@Override
- public ArrayList<IFlowPath> getAllFlowsSummary(FlowId flowId,
- int maxFlows) {
+ public ArrayList<FlowPath> getAllFlowsSummary(FlowId flowId,
+ int maxFlows) {
return FlowDatabaseOperation.getAllFlowsSummary(dbHandlerApi, flowId,
maxFlows);
}
/**
- * Get all Flows information, without the associated Flow Entries.
- *
- * @return all Flows information, without the associated Flow Entries.
- */
- public ArrayList<IFlowPath> getAllFlowsWithoutFlowEntries() {
- return FlowDatabaseOperation.getAllFlowsWithoutFlowEntries(dbHandlerApi);
- }
-
- /**
* Add and maintain a shortest-path flow.
*
* NOTE: The Flow Path argument does NOT contain flow entries.
@@ -757,87 +413,6 @@
}
/**
- * Reconcile a flow.
- *
- * @param flowObj the flow that needs to be reconciled.
- * @param newDataPath the new data path to use.
- * @return true on success, otherwise false.
- */
- private boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
- String flowIdStr = flowObj.getFlowId();
-
- //
- // Set the incoming port matching and the outgoing port output
- // actions for each flow entry.
- //
- int idx = 0;
- for (FlowEntry flowEntry : newDataPath.flowEntries()) {
- flowEntry.setFlowId(new FlowId(flowIdStr));
-
- // Mark the Flow Entry as not updated in the switch
- flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
- // Set the incoming port matching
- FlowEntryMatch flowEntryMatch = new FlowEntryMatch();
- flowEntry.setFlowEntryMatch(flowEntryMatch);
- flowEntryMatch.enableInPort(flowEntry.inPort());
-
- //
- // Set the actions
- //
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- // If the first Flow Entry, copy the Flow Path actions to it
- //
- if (idx == 0) {
- String actionsStr = flowObj.getActions();
- if (actionsStr != null) {
- FlowEntryActions flowActions = new FlowEntryActions(actionsStr);
- for (FlowEntryAction action : flowActions.actions())
- flowEntryActions.addAction(action);
- }
- }
- idx++;
- //
- // Add the outgoing port output action
- //
- FlowEntryAction flowEntryAction = new FlowEntryAction();
- flowEntryAction.setActionOutput(flowEntry.outPort());
- flowEntryActions.addAction(flowEntryAction);
- }
-
- //
- // Remove the old Flow Entries, and add the new Flow Entries
- //
- Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
- for (IFlowEntry flowEntryObj : flowEntries) {
- flowEntryObj.setUserState("FE_USER_DELETE");
- flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- }
- for (FlowEntry flowEntry : newDataPath.flowEntries()) {
- addFlowEntry(flowObj, flowEntry);
- }
-
- //
- // Set the Data Path Summary
- //
- String dataPathSummaryStr = newDataPath.dataPathSummary();
- flowObj.setDataPathSummary(dataPathSummaryStr);
-
- return true;
- }
-
- /**
- * Reconcile all flows in a set.
- *
- * @param flowObjSet the set of flows that need to be reconciliated.
- */
- private void reconcileFlows(Iterable<IFlowPath> flowObjSet) {
- if (! flowObjSet.iterator().hasNext())
- return;
- // TODO: Not implemented/used yet.
- }
-
- /**
* Install a Flow Entry on a switch.
*
* @param mySwitch the switch to install the Flow Entry into.
@@ -848,11 +423,11 @@
private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
IFlowEntry flowEntryObj) {
if (enableFlowPusher) {
- return pusher.add(mySwitch, flowObj, flowEntryObj);
+ return pusher.add(mySwitch, flowObj, flowEntryObj);
} else {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowObj, flowEntryObj);
+ return FlowSwitchOperation.installFlowEntry(
+ floodlightProvider.getOFMessageFactory(),
+ messageDamper, mySwitch, flowObj, flowEntryObj);
}
}
@@ -867,11 +442,11 @@
private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
FlowEntry flowEntry) {
if (enableFlowPusher) {
- return pusher.add(mySwitch, flowPath, flowEntry);
+ return pusher.add(mySwitch, flowPath, flowEntry);
} else {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowPath, flowEntry);
+ return FlowSwitchOperation.installFlowEntry(
+ floodlightProvider.getOFMessageFactory(),
+ messageDamper, mySwitch, flowPath, flowEntry);
}
}
@@ -902,10 +477,6 @@
*/
public void pushModifiedFlowEntriesToSwitches(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
- return;
-
if (modifiedFlowEntries.isEmpty())
return;
@@ -948,10 +519,6 @@
*/
public void pushModifiedFlowEntriesToDatagrid(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
- return;
-
if (modifiedFlowEntries.isEmpty())
return;
@@ -1005,6 +572,49 @@
}
/**
+ * Class to implement writing to the database in a separate thread.
+ */
+ class FlowDatabaseWriter extends Thread {
+ private FlowManager flowManager;
+ private BlockingQueue<FlowPathEntryPair> blockingQueue;
+
+ /**
+ * Constructor.
+ *
+ * @param flowManager the Flow Manager to use.
+ * @param blockingQueue the blocking queue to use.
+ */
+ FlowDatabaseWriter(FlowManager flowManager,
+ BlockingQueue<FlowPathEntryPair> blockingQueue) {
+ this.flowManager = flowManager;
+ this.blockingQueue = blockingQueue;
+ }
+
+ /**
+ * Run the thread.
+ */
+ @Override
+ public void run() {
+ //
+ // The main loop
+ //
+ Collection<FlowPathEntryPair> collection =
+ new LinkedList<FlowPathEntryPair>();
+ try {
+ while (true) {
+ FlowPathEntryPair entryPair = blockingQueue.take();
+ collection.add(entryPair);
+ blockingQueue.drainTo(collection);
+ flowManager.writeModifiedFlowEntriesToDatabase(collection);
+ collection.clear();
+ }
+ } catch (Exception exception) {
+ log.debug("Exception writing to the Database: ", exception);
+ }
+ }
+ }
+
+ /**
* Push Flow Entries to the Network MAP.
*
* NOTE: The Flow Entries are pushed only on the instance responsible
@@ -1013,12 +623,26 @@
*
* @param modifiedFlowEntries the collection of Flow Entries to push.
*/
- public void pushModifiedFlowEntriesToDatabase(
+ void pushModifiedFlowEntriesToDatabase(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
- return;
+ //
+ // We only add the Flow Entries to the Database Queue.
+ // The FlowDatabaseWriter thread is responsible for the actual writing.
+ //
+ flowEntriesToDatabaseQueue.addAll(modifiedFlowEntries);
+ }
+ /**
+ * Write Flow Entries to the Network MAP.
+ *
+ * NOTE: The Flow Entries are written only on the instance responsible
+ * for the first switch. This is to avoid database errors when multiple
+ * instances are writing Flow Entries for the same Flow Path.
+ *
+ * @param modifiedFlowEntries the collection of Flow Entries to write.
+ */
+ private void writeModifiedFlowEntriesToDatabase(
+ Collection<FlowPathEntryPair> modifiedFlowEntries) {
if (modifiedFlowEntries.isEmpty())
return;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index f39acb5..ba3a6e7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -3,7 +3,6 @@
import java.util.ArrayList;
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.util.CallerId;
import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
@@ -42,21 +41,6 @@
boolean deleteFlow(FlowId flowId);
/**
- * Clear the state for all previously added flows.
- *
- * @return true on success, otherwise false.
- */
- boolean clearAllFlows();
-
- /**
- * Clear the state for a previously added flow.
- *
- * @param flowId the Flow ID of the flow to clear.
- * @return true on success, otherwise false.
- */
- boolean clearFlow(FlowId flowId);
-
- /**
* Get a previously added flow.
*
* @param flowId the Flow ID of the flow to get.
@@ -97,7 +81,7 @@
* @param maxFlows number of flows to return
* @return the Flow Paths if found, otherwise null.
*/
- ArrayList<IFlowPath> getAllFlowsSummary(FlowId flowId, int maxFlows);
+ ArrayList<FlowPath> getAllFlowsSummary(FlowId flowId, int maxFlows);
/**
* Add and maintain a shortest-path flow.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/ClearFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/ClearFlowResource.java
deleted file mode 100644
index b8942b9..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/ClearFlowResource.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package net.onrc.onos.ofcontroller.flowmanager.web;
-
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
-import net.onrc.onos.ofcontroller.util.FlowId;
-
-import org.restlet.resource.Get;
-import org.restlet.resource.ServerResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Flow Manager REST API implementation: Clear internal Flow state.
- *
- * The "{flow-id}" request attribute value can be either a specific Flow ID,
- * or the keyword "all" to clear all Flows:
- *
- * GET /wm/flow/clear/{flow-id}/json
- */
-public class ClearFlowResource extends ServerResource {
- protected final static Logger log = LoggerFactory.getLogger(ClearFlowResource.class);
-
- /**
- * Implement the API.
- *
- * @return true on success, otehrwise false.
- */
- @Get("json")
- public Boolean retrieve() {
- Boolean result = false;
-
- IFlowService flowService =
- (IFlowService)getContext().getAttributes().
- get(IFlowService.class.getCanonicalName());
-
- if (flowService == null) {
- log.debug("ONOS Flow Service not found");
- return result;
- }
-
- // Extract the arguments
- String flowIdStr = (String) getRequestAttributes().get("flow-id");
-
- // Process the request
- if (flowIdStr.equals("all")) {
- log.debug("Clear All Flows");
- result = flowService.clearAllFlows();
- } else {
- FlowId flowId = new FlowId(flowIdStr);
- log.debug("Clear Flow Id: " + flowIdStr);
- result = flowService.clearFlow(flowId);
- }
- return result;
- }
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
index e027270..81d26dd 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
@@ -18,7 +18,6 @@
Router router = new Router(context);
router.attach("/add/json", AddFlowResource.class);
router.attach("/add-shortest-path/json", AddShortestPathFlowResource.class);
- router.attach("/clear/{flow-id}/json", ClearFlowResource.class);
router.attach("/delete/{flow-id}/json", DeleteFlowResource.class);
router.attach("/get/{flow-id}/json", GetFlowByIdResource.class);
router.attach("/getall-by-installer-id/{installer-id}/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json", GetAllFlowsByInstallerIdResource.class);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
index 89e5b01..58f82a9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
@@ -2,8 +2,8 @@
import java.util.ArrayList;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowId;
import org.restlet.resource.Get;
@@ -31,8 +31,8 @@
* @return the collection of Flow states if any found, otherwise null.
*/
@Get("json")
- public ArrayList<IFlowPath> retrieve() {
- ArrayList<IFlowPath> result = null;
+ public ArrayList<FlowPath> retrieve() {
+ ArrayList<FlowPath> result = null;
FlowId flowId;
int maxFlows = 0;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index cbfee6b..45f59b3 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -25,7 +25,7 @@
public class FlowProgrammer implements IFloodlightModule,
IOFMessageListener,
IOFSwitchListener {
-
+ @SuppressWarnings("unused")
private static final boolean enableFlowSync = false;
protected static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
protected volatile IFloodlightProviderService floodlightProvider;
@@ -49,7 +49,7 @@
throws FloodlightModuleException {
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
registryService = context.getServiceImpl(IControllerRegistryService.class);
- pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
+ pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
if (enableFlowSync) {
synchronizer.init(pusher);
}
@@ -149,5 +149,4 @@
// TODO Auto-generated method stub
}
-
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 532477a..f43a83e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -8,6 +8,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
import org.openflow.protocol.*;
import org.openflow.protocol.action.*;
@@ -16,7 +18,12 @@
import org.slf4j.LoggerFactory;
import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
@@ -38,9 +45,11 @@
* @author Naoki Shiota
*
*/
-public class FlowPusher implements IFlowPusherService {
+public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+ private static boolean barrierIfEmpty = false;
+
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
@@ -64,6 +73,12 @@
SUSPENDED,
}
+ /**
+ * Message queue attached to a switch.
+ * This consists of queue itself and variables used for limiting sending rate.
+ * @author Naoki Shiota
+ *
+ */
@SuppressWarnings("serial")
private class SwitchQueue extends ArrayDeque<OFMessage> {
QueueState state;
@@ -84,15 +99,20 @@
return true;
}
- long rate = last_sent_size / (current - last_sent_time);
-
- if (rate < max_rate) {
- return true;
- } else {
+ if (current == last_sent_time) {
return false;
}
+
+ // Check if sufficient time (from aspect of rate) elapsed or not.
+ long rate = last_sent_size / (current - last_sent_time);
+ return (rate < max_rate);
}
+ /**
+ * Log time and size of last sent data.
+ * @param current Time to be sent.
+ * @param size Size of sent data (in bytes).
+ */
void logSentData(long current, long size) {
last_sent_time = current;
last_sent_size = size;
@@ -100,11 +120,14 @@
}
- private OFMessageDamper messageDamper;
+ private OFMessageDamper messageDamper = null;
+ private IThreadPoolService threadPool = null;
private FloodlightContext context = null;
private BasicFactory factory = null;
- private Map<Long, FlowPusherProcess> threadMap = null;
+ private Map<Long, FlowPusherThread> threadMap = null;
+ private Map<Long, Map<Integer, OFBarrierReplyFuture>>
+ barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
private int number_thread = 1;
@@ -113,26 +136,31 @@
* @author Naoki Shiota
*
*/
- private class FlowPusherProcess implements Runnable {
+ private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
= new HashMap<IOFSwitch,SwitchQueue>();
- private boolean isStopped = false;
- private boolean isMsgAdded = false;
+ private Semaphore mutex = new Semaphore(0);
@Override
public void run() {
log.debug("Begin Flow Pusher Process");
while (true) {
+ try {
+ // wait for message pushed to queue
+ mutex.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.debug("FlowPusherThread is interrupted");
+ return;
+ }
+
Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
synchronized (queues) {
entries = queues.entrySet();
}
- // Set taint flag to false at this moment.
- isMsgAdded = false;
-
for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
IOFSwitch sw = entry.getKey();
SwitchQueue queue = entry.getValue();
@@ -152,15 +180,14 @@
int i = 0;
while (! queue.isEmpty()) {
// Number of messages excess the limit
- if (++i >= MAX_MESSAGE_SEND) {
+ if (i >= MAX_MESSAGE_SEND) {
// Messages remains in queue
- isMsgAdded = true;
+ mutex.release();
break;
}
+ ++i;
OFMessage msg = queue.poll();
-
- // if need to send, call IOFSwitch#write()
try {
messageDamper.write(sw, msg, context);
log.debug("Pusher sends message : {}", msg);
@@ -172,42 +199,50 @@
}
sw.flush();
queue.logSentData(current_time, size);
+
+ if (queue.isEmpty() && barrierIfEmpty) {
+ barrier(sw);
+ }
}
}
}
-
- // sleep while all queues are empty
- while (! (isMsgAdded || isStopped)) {
- try {
- Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
- } catch (InterruptedException e) {
- e.printStackTrace();
- log.error("Thread.sleep failed");
- }
- }
-
- log.debug("Exit sleep loop.");
-
- if (isStopped) {
- log.debug("Pusher Process finished.");
- return;
- }
-
}
}
}
+ /**
+ * Initialize object with one thread.
+ */
public FlowPusher() {
-
}
+ /**
+ * Initialize object with threads of given number.
+ * @param number_thread Number of threads to handle messages.
+ */
public FlowPusher(int number_thread) {
this.number_thread = number_thread;
}
- public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
+ /**
+ * Set parameters needed for sending messages.
+ * @param context FloodlightContext used for sending messages.
+ * If null, FlowPusher uses default context.
+ * @param modContext FloodlightModuleContext used for acquiring
+ * ThreadPoolService and registering MessageListener.
+ * @param factory Factory object to create OFMessage objects.
+ * @param damper Message damper used for sending messages.
+ * If null, FlowPusher creates its own damper object.
+ */
+ public void init(FloodlightContext context,
+ FloodlightModuleContext modContext,
+ BasicFactory factory,
+ OFMessageDamper damper) {
this.context = context;
this.factory = factory;
+ this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+ IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
+ flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
if (damper != null) {
messageDamper = damper;
@@ -228,12 +263,11 @@
return;
}
- threadMap = new HashMap<Long,FlowPusherProcess>();
+ threadMap = new HashMap<Long,FlowPusherThread>();
for (long i = 0; i < number_thread; ++i) {
- FlowPusherProcess runnable = new FlowPusherProcess();
- threadMap.put(i, runnable);
+ FlowPusherThread thread = new FlowPusherThread();
- Thread thread = new Thread(runnable);
+ threadMap.put(i, thread);
thread.start();
}
}
@@ -302,10 +336,8 @@
return;
}
- for (FlowPusherProcess runnable : threadMap.values()) {
- if (! runnable.isStopped) {
- runnable.isStopped = true;
- }
+ for (FlowPusherThread t : threadMap.values()) {
+ t.interrupt();
}
}
@@ -326,14 +358,14 @@
}
/**
- * Add OFMessage to the queue related to given switch.
+ * Add OFMessage to queue of the switch.
* @param sw Switch to which message is sent.
* @param msg Message to be sent.
* @return true if succeed.
*/
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
- FlowPusherProcess proc = getProcess(sw);
+ FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
if (queue == null) {
@@ -349,8 +381,10 @@
log.debug("Message is pushed : {}", msg);
}
- proc.isMsgAdded = true;
-
+ if (proc.mutex.availablePermits() == 0) {
+ proc.mutex.release();
+ }
+
return true;
}
@@ -984,8 +1018,56 @@
return add(sw,fm);
}
+
+ @Override
+ public OFBarrierReply barrier(IOFSwitch sw) {
+ OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+ if (future == null) {
+ return null;
+ }
+
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.error("InterruptedException: {}", e);
+ return null;
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ log.error("ExecutionException: {}", e);
+ return null;
+ }
+ }
- private SwitchQueue getQueue(IOFSwitch sw) {
+ @Override
+ public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+ // TODO creation of message and future should be moved to OFSwitchImpl
+
+ if (sw == null) {
+ return null;
+ }
+
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+ msg.setXid(sw.getNextTransactionId());
+ add(sw, msg);
+
+ // TODO create Future object of message
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+
+ synchronized (barrierFutures) {
+ Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+ if (map == null) {
+ map = new HashMap<Integer,OFBarrierReplyFuture>();
+ barrierFutures.put(sw.getId(), map);
+ }
+ map.put(msg.getXid(), future);
+ log.debug("Inserted future for {}", msg.getXid());
+ }
+
+ return future;
+ }
+
+ protected SwitchQueue getQueue(IOFSwitch sw) {
if (sw == null) {
return null;
}
@@ -993,14 +1075,48 @@
return getProcess(sw).queues.get(sw);
}
- private long getHash(IOFSwitch sw) {
- // TODO should consider equalization algorithm
+ protected long getHash(IOFSwitch sw) {
+ // This code assumes DPID is sequentially assigned.
+ // TODO consider equalization algorithm
return sw.getId() % number_thread;
}
- private FlowPusherProcess getProcess(IOFSwitch sw) {
+ protected FlowPusherThread getProcess(IOFSwitch sw) {
long hash = getHash(sw);
return threadMap.get(hash);
}
+
+ @Override
+ public String getName() {
+ return "flowpusher";
+ }
+
+ @Override
+ public boolean isCallbackOrderingPrereq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public boolean isCallbackOrderingPostreq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+ Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+ if (map == null) {
+ return Command.CONTINUE;
+ }
+
+ OFBarrierReplyFuture future = map.get(msg.getXid());
+ if (future == null) {
+ return Command.CONTINUE;
+ }
+
+ log.debug("Received BARRIER_REPLY : {}", msg);
+ future.deliverFuture(sw, msg);
+
+ return Command.CONTINUE;
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index e16dd20..94d6e35 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -1,43 +1,75 @@
package net.onrc.onos.ofcontroller.flowprogrammer;
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowPath;
-import org.openflow.protocol.OFMessage;
-
public interface IFlowPusherService extends IFloodlightService {
/**
- * Add a message to the queue of a switch.
- * @param sw
- * @param msg
- * @return
+ * Add a message to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param msg Message object to be added.
+ * @return true if message is successfully added to a queue.
*/
boolean add(IOFSwitch sw, OFMessage msg);
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param flowPath FlowPath object used for creating message.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry);
+
+ /**
+ * Create a message from IFlowEntry and add it to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param flowObj IFlowPath object used for creating message.
+ * @param flowEntryObj IFlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
+
+ /**
+ * Add BARRIER message to queue and wait for reply.
+ * @param sw Switch to which barrier message is pushed.
+ * @return BARRIER_REPLY message sent from switch.
+ */
+ OFBarrierReply barrier(IOFSwitch sw);
+
+ /**
+ * Add BARRIER message to queue asynchronously.
+ * @param sw Switch to which barrier message is pushed.
+ * @return Future object of BARRIER_REPLY message which will be sent from switch.
+ */
+ OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
/**
* Suspend pushing message to a switch.
- * @param sw
+ * @param sw Switch to be suspended pushing message.
* @return true if success
*/
boolean suspend(IOFSwitch sw);
/**
* Resume pushing message to a switch.
- * @param sw
+ * @param sw Switch to be resumed pushing message.
* @return true if success
*/
boolean resume(IOFSwitch sw);
/**
* Get whether pushing of message is suspended or not.
- * @param sw
- * @return true if suspended
+ * @param sw Switch to be checked.
+ * @return true if suspended.
*/
boolean isSuspended(IOFSwitch sw);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
new file mode 100644
index 0000000..3013f5a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
@@ -0,0 +1,49 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import java.util.concurrent.TimeUnit;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+public class OFBarrierReplyFuture extends OFMessageFuture<OFBarrierReply> {
+
+ protected volatile boolean finished;
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId);
+ init();
+ }
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId, timeout, unit);
+ init();
+ }
+
+ private void init() {
+ this.finished = false;
+ this.result = null;
+ }
+
+ @Override
+ protected void handleReply(IOFSwitch sw, OFMessage msg) {
+ this.result = (OFBarrierReply) msg;
+ this.finished = true;
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ protected void unRegister() {
+ super.unRegister();
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntry.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntry.java
index 15a6233..98dbd88 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntry.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntry.java
@@ -136,10 +136,11 @@
*
* @return true if the Flow ID is valid, otherwise false.
*/
+ @JsonIgnore
public boolean isValidFlowId() {
if (this.flowId == null)
return false;
- return (this.flowId.value() != 0);
+ return (this.flowId.isValid());
}
/**
@@ -165,10 +166,11 @@
*
* @return true if the Flow Entry ID is valid, otherwise false.
*/
+ @JsonIgnore
public boolean isValidFlowEntryId() {
if (this.flowEntryId == null)
return false;
- return (this.flowEntryId.value() != 0);
+ return (this.flowEntryId.isValid());
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryActions.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryActions.java
index 53aab66..7d9688b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryActions.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryActions.java
@@ -2,6 +2,7 @@
import java.util.ArrayList;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
/**
@@ -79,6 +80,7 @@
*
* @return true if the set of actions is empty, otherwise false.
*/
+ @JsonIgnore
public Boolean isEmpty() {
return actions.isEmpty();
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java
index 29efe4c..f5728b0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java
@@ -5,6 +5,7 @@
import net.onrc.onos.ofcontroller.util.serializers.FlowEntryIdDeserializer;
import net.onrc.onos.ofcontroller.util.serializers.FlowEntryIdSerializer;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.map.annotate.JsonDeserialize;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -20,7 +21,7 @@
* Default constructor.
*/
public FlowEntryId() {
- this.value = 0;
+ this.value = -1;
}
/**
@@ -66,7 +67,17 @@
public void setValue(long value) {
this.value = value;
}
-
+
+ /**
+ * Test whether the Flow Entry ID is valid.
+ *
+ * @return true if the Flow Entry ID is valid, otherwise false.
+ */
+ @JsonIgnore
+ public boolean isValid() {
+ return (this.value() != -1);
+ }
+
/**
* Returns true of the object is another Flow Entry ID with
* the same value; otherwise, returns false.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowId.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowId.java
index de955ba..d90e96f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowId.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowId.java
@@ -5,6 +5,7 @@
import net.onrc.onos.ofcontroller.util.serializers.FlowIdDeserializer;
import net.onrc.onos.ofcontroller.util.serializers.FlowIdSerializer;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.map.annotate.JsonDeserialize;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -20,7 +21,7 @@
* Default constructor.
*/
public FlowId() {
- this.value = 0;
+ this.value = -1;
}
/**
@@ -68,6 +69,16 @@
}
/**
+ * Test whether the Flow ID is valid.
+ *
+ * @return true if the Flow ID is valid, otherwise false.
+ */
+ @JsonIgnore
+ public boolean isValid() {
+ return (this.value() != -1);
+ }
+
+ /**
* Convert the Flow ID value to a hexadecimal string.
*
* @return the Flow ID value to a hexadecimal string.
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
index 7fd0f67..8a727d3 100644
--- a/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowmanager/FlowManagerTest.java
@@ -239,49 +239,48 @@
verifyAll();
assertTrue(result);
}
-
+
/**
- * Test method for {@link FlowManager#deleteAllFlows()}.
- * @throws Exception
+ * Test method for {@link FlowManager#deleteFlow(FlowId)}.
+ * @throws Exception
*/
@Test
- public final void testDeleteAllFlowsSuccessNormally() throws Exception {
+ public final void testDeleteFlowSuccessNormally() throws Exception {
// create mock objects
- IFlowPath flowPath1 = createNiceMock(IFlowPath.class);
- IFlowPath flowPath2 = createNiceMock(IFlowPath.class);
+ IFlowPath flowPath = createIFlowPathMock(123, "id", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 2, 3, 4);
+ IFlowEntry flowEntry1 = createMock(IFlowEntry.class);
+ IFlowEntry flowEntry2 = createMock(IFlowEntry.class);
+ IFlowEntry flowEntry3 = createMock(IFlowEntry.class);
// instantiate required objects
- ArrayList<IFlowPath> flowPaths = new ArrayList<IFlowPath>();
- flowPaths.add(flowPath1);
- flowPaths.add(flowPath2);
FlowManager fm = new FlowManager();
-
+ FlowId flowId = new FlowId(123);
+ ArrayList<IFlowEntry> flowEntries = new ArrayList<IFlowEntry>();
+ flowEntries.add(flowEntry1);
+ flowEntries.add(flowEntry2);
+ flowEntries.add(flowEntry3);
+
// setup expectations
expectInitWithContext();
- expect(op.getAllFlowPaths()).andReturn(flowPaths);
-
- expect(flowPath1.getFlowId()).andReturn("1").anyTimes();
- expect(op.searchFlowPath(cmpEq(new FlowId(1)))).andReturn(flowPath1);
- expect(flowPath1.getFlowEntries()).andReturn(new ArrayList<IFlowEntry>());
- op.removeFlowPath(flowPath1);
-
- expect(flowPath2.getFlowId()).andReturn("2").anyTimes();
- expect(op.searchFlowPath(cmpEq(new FlowId(2)))).andReturn(flowPath2);
- expect(flowPath2.getFlowEntries()).andReturn(new ArrayList<IFlowEntry>());
- op.removeFlowPath(flowPath2);
-
+ expect(op.searchFlowPath(cmpEq(flowId))).andReturn(flowPath);
+ expect(flowPath.getFlowEntries()).andReturn(flowEntries);
+ flowPath.removeFlowEntry(flowEntry1);
+ flowPath.removeFlowEntry(flowEntry2);
+ flowPath.removeFlowEntry(flowEntry3);
+ op.removeFlowEntry(flowEntry1);
+ op.removeFlowEntry(flowEntry2);
+ op.removeFlowEntry(flowEntry3);
+ op.removeFlowPath(flowPath);
op.commit();
- expectLastCall().anyTimes();
// start the test
replayAll();
fm.init(context);
- Boolean result = fm.deleteAllFlows();
+ fm.deleteFlow(flowId);
// verify the test
verifyAll();
- assertTrue(result);
}
/**
@@ -316,16 +315,16 @@
}
/**
- * Test method for {@link FlowManager#clearAllFlows()}.
+ * Test method for {@link FlowManager#deleteAllFlows()}.
* @throws Exception
*/
@Test
- public final void testClearAllFlowsSuccessNormally() throws Exception {
+ public final void testDeleteAllFlowsSuccessNormally() throws Exception {
// create mock objects
IFlowPath flowPath1 = createNiceMock(IFlowPath.class);
IFlowPath flowPath2 = createNiceMock(IFlowPath.class);
IFlowPath flowPath3 = createNiceMock(IFlowPath.class);
- FlowManager fm = createPartialMockAndInvokeDefaultConstructor(FlowManager.class, "clearFlow");
+ FlowManager fm = createPartialMockAndInvokeDefaultConstructor(FlowManager.class, "deleteFlow");
// instantiate required objects
ArrayList<IFlowPath> flowPaths = new ArrayList<IFlowPath>();
@@ -340,16 +339,16 @@
expect(flowPath1.getFlowId()).andReturn(new FlowId(1).toString());
expect(flowPath2.getFlowId()).andReturn(null);
expect(flowPath3.getFlowId()).andReturn(new FlowId(3).toString());
- expect(fm.clearFlow(cmpEq(new FlowId(1)))).andReturn(true);
- expect(fm.clearFlow(cmpEq(new FlowId(3)))).andReturn(true);
+ expect(fm.deleteFlow(cmpEq(new FlowId(1)))).andReturn(true);
+ expect(fm.deleteFlow(cmpEq(new FlowId(3)))).andReturn(true);
// start the test
replayAll();
fm.init(context);
- Boolean result = fm.clearAllFlows();
+ Boolean result = fm.deleteAllFlows();
- //verify the test
+ // verify the test
verifyAll();
assertTrue(result);
}
@@ -464,35 +463,35 @@
*/
@Test
public final void testGetAllFlowsSummarySuccessNormally() throws Exception {
- final String getAllFlowsWithoutFlowEntries = "getAllFlowsWithoutFlowEntries";
+ final String getAllFlowsWithDataPathSummary = "getAllFlowsWithDataPathSummary";
// create mock objects
- FlowManager fm = createPartialMockAndInvokeDefaultConstructor(FlowManager.class, getAllFlowsWithoutFlowEntries);
- IFlowPath flowPath1 = createIFlowPathMock(1, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 2, 3, 4);
- IFlowPath flowPath2 = createIFlowPathMock(5, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 2, 3, 4, 5);
- IFlowPath flowPath3 = createIFlowPathMock(10, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 3, 4, 5, 6);
+ FlowManager fm = createPartialMockAndInvokeDefaultConstructor(FlowManager.class, getAllFlowsWithDataPathSummary);
+ FlowPath flowPath1 = createTestFlowPath(1, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 2, 3, 4);
+ FlowPath flowPath2 = createTestFlowPath(5, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 2, 3, 4, 5);
+ FlowPath flowPath3 = createTestFlowPath(10, "", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 3, 4, 5, 6);
// instantiate required objects
- ArrayList<IFlowPath> flows = new ArrayList<IFlowPath>();
+ ArrayList<FlowPath> flows = new ArrayList<FlowPath>();
flows.add(flowPath3);
flows.add(flowPath1);
flows.add(flowPath2);
// setup expectations
expectInitWithContext();
- expectPrivate(fm, getAllFlowsWithoutFlowEntries).andReturn(flows);
+ expectPrivate(fm, getAllFlowsWithDataPathSummary).andReturn(flows);
// start the test
replayAll();
fm.init(context);
- ArrayList<IFlowPath> returnedFlows = fm.getAllFlowsSummary(null, 0);
+ ArrayList<FlowPath> returnedFlows = fm.getAllFlowsSummary(null, 0);
// verify the test
verifyAll();
assertEquals(3, returnedFlows.size());
- assertEquals(1, new FlowId(returnedFlows.get(0).getFlowId()).value());
- assertEquals(5, new FlowId(returnedFlows.get(1).getFlowId()).value());
- assertEquals(10, new FlowId(returnedFlows.get(2).getFlowId()).value());
+ assertEquals(1, new FlowId(returnedFlows.get(0).flowId().value()).value());
+ assertEquals(5, new FlowId(returnedFlows.get(1).flowId().value()).value());
+ assertEquals(10, new FlowId(returnedFlows.get(2).flowId().value()).value());
}
/**
@@ -786,85 +785,6 @@
// other methods
-
- /**
- * Test method for {@link FlowManager#clearFlow(FlowId)}.
- * @throws Exception
- */
- @Test
- public final void testClearFlowSuccessNormally() throws Exception {
- // create mock objects
- IFlowPath flowPath = createIFlowPathMock(123, "id", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 2, 3, 4);
- IFlowEntry flowEntry1 = createMock(IFlowEntry.class);
- IFlowEntry flowEntry2 = createMock(IFlowEntry.class);
- IFlowEntry flowEntry3 = createMock(IFlowEntry.class);
-
- // instantiate required objects
- FlowManager fm = new FlowManager();
- FlowId flowId = new FlowId(123);
- ArrayList<IFlowEntry> flowEntries = new ArrayList<IFlowEntry>();
- flowEntries.add(flowEntry1);
- flowEntries.add(flowEntry2);
- flowEntries.add(flowEntry3);
-
- // setup expectations
- expectInitWithContext();
- expect(op.searchFlowPath(cmpEq(flowId))).andReturn(flowPath);
- expect(flowPath.getFlowEntries()).andReturn(flowEntries);
- flowPath.removeFlowEntry(flowEntry1);
- flowPath.removeFlowEntry(flowEntry2);
- flowPath.removeFlowEntry(flowEntry3);
- op.removeFlowEntry(flowEntry1);
- op.removeFlowEntry(flowEntry2);
- op.removeFlowEntry(flowEntry3);
- op.removeFlowPath(flowPath);
- op.commit();
-
- // start the test
- replayAll();
-
- fm.init(context);
- fm.clearFlow(flowId);
-
- // verify the test
- verifyAll();
- }
-
- /**
- * Test method for {@link FlowManager#getAllFlowsWithoutFlowEntries()}.
- * @throws Exception
- */
- @Test
- public final void testGetAllFlowsWithoutFlowEntriesSuccessNormally() throws Exception {
- // create mock objects
- IFlowPath iFlowPath1 = createIFlowPathMock(1, "caller id", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 1, 1, 2, 2);
- IFlowPath iFlowPath2 = createIFlowPathMock(2, "caller id", "FP_TYPE_SHORTEST_PATH", "FP_USER_ADD", 0, 2, 5, 3, 5);
-
- // instantiate required objects
- ArrayList<IFlowPath> flowPaths = new ArrayList<IFlowPath>();
- flowPaths.add(iFlowPath1);
- flowPaths.add(iFlowPath2);
- FlowManager fm = new FlowManager();
-
- // setup expectations
- expectInitWithContext();
- op.commit();
- expect(op.getAllFlowPaths()).andReturn(flowPaths);
-
- // start the test
- replayAll();
-
- fm.init(context);
- ArrayList<IFlowPath> result = fm.getAllFlowsWithoutFlowEntries();
-
- // verify the test
- verifyAll();
- assertEquals(iFlowPath1, result.get(0));
- assertEquals(iFlowPath2, result.get(1));
-
- // TODO: does this method just return the replica of the flow paths?
- }
-
/**
* Test method for {@link FlowManager#reconcileFlow(IFlowPath, DataPath)}.
* @throws Exception
diff --git a/web/clear_flow.py b/web/clear_flow.py
deleted file mode 100755
index db70d40..0000000
--- a/web/clear_flow.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#! /usr/bin/env python
-# -*- Mode: python; py-indent-offset: 4; tab-width: 8; indent-tabs-mode: t; -*-
-
-import pprint
-import os
-import sys
-import subprocess
-import json
-import argparse
-import io
-import time
-
-from flask import Flask, json, Response, render_template, make_response, request
-
-#
-# TODO: remove this! We don't use JSON argument here!
-# curl http://127.0.0.1:8080/wm/flow/clear/{"value":"0xf"}/json'
-#
-
-## Global Var ##
-ControllerIP="127.0.0.1"
-ControllerPort=8080
-
-DEBUG=0
-pp = pprint.PrettyPrinter(indent=4)
-
-app = Flask(__name__)
-
-## Worker Functions ##
-def log_error(txt):
- print '%s' % (txt)
-
-def debug(txt):
- if DEBUG:
- print '%s' % (txt)
-
-# @app.route("/wm/flow/clear/<flow-id>/json")
-def clear_flow_path(flow_id):
- command = "curl -s \"http://%s:%s/wm/flow/clear/%s/json\"" % (ControllerIP, ControllerPort, flow_id)
- debug("clear_flow_path %s" % command)
- result = os.popen(command).read()
- debug("result %s" % result)
- # parsedResult = json.loads(result)
- # debug("parsed %s" % parsedResult)
-
-if __name__ == "__main__":
- usage_msg = "Clear flow state from the ONOS Network Map\n"
- usage_msg = usage_msg + "Usage: %s <begin-flow-id> <end-flow-id>\n" % (sys.argv[0])
- usage_msg = usage_msg + " %s <flow-id>\n" % (sys.argv[0])
- usage_msg = usage_msg + "\n"
- usage_msg = usage_msg + " Arguments:\n"
- usage_msg = usage_msg + " <begin-flow-id> <end-flow-id> Clear all flows in the flow ID range\n"
- usage_msg = usage_msg + " <flow-id> Clear a single flow with the flow ID\n"
- usage_msg = usage_msg + " all Clear all flows\n"
-
- # app.debug = False;
-
- # Usage info
- if len(sys.argv) > 1 and (sys.argv[1] == "-h" or sys.argv[1] == "--help"):
- print(usage_msg)
- exit(0)
-
- # Check arguments
- if len(sys.argv) < 2:
- log_error(usage_msg)
- exit(1)
-
- if (sys.argv[1] == "all"):
- clear_flow_path(sys.argv[1])
- else:
- begin_flow_id = int(sys.argv[1], 0)
- if len(sys.argv) >= 3:
- end_flow_id = int(sys.argv[2], 0)
- else:
- end_flow_id = begin_flow_id
-
- # Do the work
- flow_id = begin_flow_id
- while flow_id <= end_flow_id:
- clear_flow_path(flow_id)
- flow_id = flow_id + 1