Merge branch 'syncdev17' of https://github.com/n-shiota/ONOS into syncdev17
Conflicts:
src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index f427beb..04cde1d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -416,25 +416,6 @@
* Install a Flow Entry on a switch.
*
* @param mySwitch the switch to install the Flow Entry into.
- * @param flowObj the flow path object for the flow entry to install.
- * @param flowEntryObj the flow entry object to install.
- * @return true on success, otherwise false.
- */
- private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
- IFlowEntry flowEntryObj) {
- if (enableFlowPusher) {
- return pusher.add(mySwitch, flowObj, flowEntryObj);
- } else {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowObj, flowEntryObj);
- }
- }
-
- /**
- * Install a Flow Entry on a switch.
- *
- * @param mySwitch the switch to install the Flow Entry into.
* @param flowPath the flow path for the flow entry to install.
* @param flowEntry the flow entry to install.
* @return true on success, otherwise false.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index bd54755..33c9a6a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -25,8 +25,13 @@
/**
* FlowProgrammer is a module responsible to maintain flows installed to switches.
* FlowProgrammer consists of FlowPusher and FlowSynchronizer.
- * FlowPusher manages the rate of installation, and FlowSynchronizer synchronizez
+ * FlowPusher manages the rate of installation, and FlowSynchronizer synchronizes
* flows between GraphDB and switches.
+ * FlowProgrammer also watch the event of addition/deletion of switches to
+ * start/stop synchronization. When a switch is added to network, FlowProgrammer
+ * immediately kicks synchronization to keep switch's flow table latest state.
+ * Adversely, when a switch is removed from network, FlowProgrammer immediately
+ * stops synchronization.
* @author Brian
*
*/
@@ -40,7 +45,6 @@
protected volatile IFloodlightProviderService floodlightProvider;
protected volatile IControllerRegistryService registryService;
-
protected FlowPusher pusher;
private static final int NUM_PUSHER_THREAD = 1;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index ba3602c..6f85c5d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -40,17 +40,20 @@
import net.onrc.onos.ofcontroller.util.Port;
/**
- * FlowPusher intermediates FlowManager/FlowSynchronizer and switches to push OpenFlow
- * messages to switches in proper rate.
+ * FlowPusher is a implementation of FlowPusherService.
+ * FlowPusher assigns one message queue instance for each one switch.
+ * Number of message processing threads is configurable by constructor, and
+ * one thread can handle multiple message queues. Each queue will be assigned to
+ * a thread according to hash function defined by getHash().
+ * Each processing thread reads messages from queues and sends it to switches
+ * in round-robin. Processing thread also calculates rate of sending to suppress
+ * excessive message sending.
* @author Naoki Shiota
*
*/
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
- // If this flag is true, barrier message will be sent when queue goes empty.
- private static boolean barrierIfEmpty = false;
-
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
@@ -71,7 +74,7 @@
}
/**
- * Message queue attached to a switch.
+ * SwitchQueue represents message queue attached to a switch.
* This consists of queue itself and variables used for limiting sending rate.
* @author Naoki Shiota
*
@@ -104,8 +107,7 @@
}
// Check if sufficient time (from aspect of rate) elapsed or not.
- long interval = (current - last_sent_time) / 1000000;
- long rate = last_sent_size / interval;
+ long rate = last_sent_size / (current - last_sent_time);
return (rate < max_rate);
}
@@ -126,7 +128,10 @@
private FloodlightContext context = null;
private BasicFactory factory = null;
+
+ // Map of threads versus dpid
private Map<Long, FlowPusherThread> threadMap = null;
+ // Map of Future objects versus dpid and transaction ID.
private Map<Long, Map<Integer, OFBarrierReplyFuture>>
barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
@@ -139,14 +144,12 @@
*/
private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
- = new HashMap<IOFSwitch,SwitchQueue>();
+ = new HashMap<IOFSwitch,SwitchQueue>();
private Semaphore mutex = new Semaphore(0);
@Override
public void run() {
- log.debug("Begin Flow Pusher Process");
-
while (true) {
try {
// wait for message pushed to queue
@@ -173,7 +176,7 @@
}
// check sending rate and determine it to be sent or not
- long current_time = System.nanoTime();
+ long current_time = System.currentTimeMillis();
long size = 0;
synchronized (queue) {
@@ -202,9 +205,7 @@
queue.logSentData(current_time, size);
if (queue.isEmpty()) {
- if (barrierIfEmpty) {
- barrier(sw);
- }
+ // remove queue if flagged to be.
if (queue.toBeDeleted) {
synchronized (queues) {
queues.remove(sw);
@@ -255,7 +256,7 @@
if (damper != null) {
messageDamper = damper;
} else {
- // use default value
+ // use default values
messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
EnumSet.of(OFType.FLOW_MOD),
OFMESSAGE_DAMPER_TIMEOUT);
@@ -280,10 +281,6 @@
}
}
- /**
- * Suspend sending messages to switch.
- * @param sw
- */
@Override
public boolean suspend(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -301,9 +298,6 @@
}
}
- /**
- * Resume sending messages to switch.
- */
@Override
public boolean resume(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -321,9 +315,6 @@
}
}
- /**
- * Check if given switch is suspended.
- */
@Override
public boolean isSuspended(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -384,10 +375,10 @@
}
@Override
- public boolean deleteQueue(IOFSwitch sw, boolean force) {
+ public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
FlowPusherThread proc = getProcess(sw);
- if (force) {
+ if (forceStop) {
synchronized (proc.queues) {
SwitchQueue queue = proc.queues.remove(sw);
if (queue == null) {
@@ -407,18 +398,12 @@
}
}
- /**
- * Add OFMessage to queue of the switch.
- * @param sw Switch to which message is sent.
- * @param msg Message to be sent.
- * @return true if succeed.
- */
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
- // create queue at first call
+ // create queue at first addition of message
if (queue == null) {
createQueue(sw);
queue = getQueue(sw);
@@ -436,13 +421,6 @@
return true;
}
- /**
- * Create OFMessage from given flow information and add it to the queue.
- * @param sw Switch to which message is sent.
- * @param flowObj FlowPath.
- * @param flowEntryObj FlowEntry.
- * @return true if succeed.
- */
@Override
public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
log.debug("sending : {}, {}", sw, flowObj);
@@ -748,13 +726,6 @@
return true;
}
- /**
- * Create OFMessage from given flow information and add it to the queue.
- * @param sw Switch to which message is sent.
- * @param flowPath FlowPath.
- * @param flowEntry FlowEntry.
- * @return true if succeed.
- */
@Override
public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
//
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 4f3fced..0d6b0e8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -29,11 +29,10 @@
import net.onrc.onos.ofcontroller.util.FlowEntryId;
/**
- * FlowSynchronizer is a sub-module of FlowProgrammer to keep switches' flow table
- * synchronized with GraphDB. FlowSynchronizer periodically read flow tables from
+ * FlowSynchronizer is an implementation of FlowSyncService.
+ * In addition to IFlowSyncService, FlowSynchronizer periodically reads flow tables from
* switches and compare them with GraphDB to drop unnecessary flows and/or to install
- * missing flows. FlowSynchronizer also watch the event of addition/deletion of switches
- * and start synchronization.
+ * missing flows.
* @author Brian
*
*/
@@ -52,7 +51,7 @@
@Override
public void synchronize(IOFSwitch sw) {
- Synchroizer sync = new Synchroizer(sw);
+ Synchronizer sync = new Synchronizer(sw);
Thread t = new Thread(sync);
switchThreads.put(sw, t);
t.start();
@@ -66,6 +65,10 @@
}
}
+ /**
+ * Initialize Synchronizer.
+ * @param pusherService FlowPusherService used for sending messages.
+ */
public void init(IFlowPusherService pusherService) {
pusher = pusherService;
}
@@ -75,11 +78,11 @@
* @author Brian
*
*/
- protected class Synchroizer implements Runnable {
+ protected class Synchronizer implements Runnable {
IOFSwitch sw;
ISwitchObject swObj;
- public Synchroizer(IOFSwitch sw) {
+ public Synchronizer(IOFSwitch sw) {
this.sw = sw;
Dpid dpid = new Dpid(sw.getId());
this.swObj = dbHandler.searchSwitch(dpid.toString());
@@ -210,7 +213,7 @@
/**
* Install this FlowEntry to a switch via FlowPusher.
- * @param sw
+ * @param sw Switch to which flow will be installed.
*/
public void addToSwitch(IOFSwitch sw) {
if(iflowEntry != null) {
@@ -224,7 +227,7 @@
/**
* Remove this FlowEntry from a switch via FlowPusher.
- * @param sw
+ * @param sw Switch from which flow will be removed.
*/
public void removeFromSwitch(IOFSwitch sw){
if(iflowEntry != null) {
@@ -258,6 +261,7 @@
* the same value; otherwise, returns false.
*
* @param Object to compare
+ * @return true if the object has the same Flow Entry ID.
*/
@Override
public boolean equals(Object obj){
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index 570120f..20a6249 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -11,6 +11,15 @@
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowPath;
+/**
+ * FlowPusherService is a service to send message to switches in proper rate.
+ * Conceptually a queue is attached to each switch, and FlowPusherService
+ * read a message from queue and send it to switch in order.
+ * To guarantee message has been installed, FlowPusherService can add barrier
+ * message to queue and can notify when barrier message is sent to switch.
+ * @author Naoki Shiota
+ *
+ */
public interface IFlowPusherService extends IFloodlightService {
/**
* Create a queue correspondent to the switch.
@@ -31,11 +40,13 @@
* Delete a queue correspondent to the switch.
* By setting force flag on, queue will be deleted immediately.
* @param sw Switch of which queue is deleted.
- * @param force If this flag is on, queue will be deleted immediately
- * regardless of any messages in the queue.
- * @return true if queue is successfully deleted.
+ * @param forceStop If this flag is set to true, queue will be deleted
+ * immediately regardless of any messages in the queue.
+ * If false, all messages will be sent to switch and queue will
+ * be deleted after that.
+ * @return true if queue is successfully deleted or flagged to be deleted.
*/
- boolean deleteQueue(IOFSwitch sw, boolean force);
+ boolean deleteQueue(IOFSwitch sw, boolean forceStop);
/**
* Add a message to the queue of the switch.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
index 2b9d51d..4e6efaf 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
@@ -5,7 +5,9 @@
import net.floodlightcontroller.core.module.IFloodlightService;
/**
- * @author bocon
+ * FlowSyncService is a service to synchronize GraphDB and switch's flow table.
+ * FlowSyncService offers APIs to trigger and interrupt synchronization explicitly.
+ * @author Brian
*
*/
public interface IFlowSyncService extends IFloodlightService {