Added JavaDoc comments for FlowPusher and FlowSynchronizer
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index 45f59b3..bd54755 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -22,10 +22,19 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.registry.controller.IControllerRegistryService;
+/**
+ * FlowProgrammer is a module responsible to maintain flows installed to switches.
+ * FlowProgrammer consists of FlowPusher and FlowSynchronizer.
+ * FlowPusher manages the rate of installation, and FlowSynchronizer synchronizez
+ * flows between GraphDB and switches.
+ * @author Brian
+ *
+ */
public class FlowProgrammer implements IFloodlightModule,
IOFMessageListener,
IOFSwitchListener {
@SuppressWarnings("unused")
+ // flag to enable FlowSynchronizer
private static final boolean enableFlowSync = false;
protected static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
protected volatile IFloodlightProviderService floodlightProvider;
@@ -36,7 +45,7 @@
private static final int NUM_PUSHER_THREAD = 1;
protected FlowSynchronizer synchronizer;
-
+
public FlowProgrammer() {
pusher = new FlowPusher(NUM_PUSHER_THREAD);
if (enableFlowSync) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index f43a83e..ba3602c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -48,6 +48,7 @@
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+ // If this flag is true, barrier message will be sent when queue goes empty.
private static boolean barrierIfEmpty = false;
// NOTE: Below are moved from FlowManager.
@@ -57,10 +58,6 @@
protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
- // Interval of sleep when queue is empty
- protected static final long SLEEP_MILLI_SEC = 10;
- protected static final int SLEEP_NANO_SEC = 0;
-
// Number of messages sent to switch at once
protected static final int MAX_MESSAGE_SEND = 100;
@@ -83,11 +80,14 @@
private class SwitchQueue extends ArrayDeque<OFMessage> {
QueueState state;
- // Max rate of sending message (bytes/sec). 0 implies no limitation.
+ // Max rate of sending message (bytes/ms). 0 implies no limitation.
long max_rate = 0; // 0 indicates no limitation
long last_sent_time = 0;
long last_sent_size = 0;
+ // "To be deleted" flag
+ boolean toBeDeleted = false;
+
/**
* Check if sending rate is within the rate
* @param current Current time
@@ -104,7 +104,8 @@
}
// Check if sufficient time (from aspect of rate) elapsed or not.
- long rate = last_sent_size / (current - last_sent_time);
+ long interval = (current - last_sent_time) / 1000000;
+ long rate = last_sent_size / interval;
return (rate < max_rate);
}
@@ -200,8 +201,15 @@
sw.flush();
queue.logSentData(current_time, size);
- if (queue.isEmpty() && barrierIfEmpty) {
- barrier(sw);
+ if (queue.isEmpty()) {
+ if (barrierIfEmpty) {
+ barrier(sw);
+ }
+ if (queue.toBeDeleted) {
+ synchronized (queues) {
+ queues.remove(sw);
+ }
+ }
}
}
}
@@ -341,11 +349,7 @@
}
}
- /**
- * Set sending rate to a switch.
- * @param sw Switch.
- * @param rate Rate in bytes/sec.
- */
+ @Override
public void setRate(IOFSwitch sw, long rate) {
SwitchQueue queue = getQueue(sw);
if (queue == null) {
@@ -356,6 +360,52 @@
queue.max_rate = rate;
}
}
+
+ @Override
+ public boolean createQueue(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue != null) {
+ return false;
+ }
+
+ FlowPusherThread proc = getProcess(sw);
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ synchronized (proc) {
+ proc.queues.put(sw, queue);
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw) {
+ return deleteQueue(sw, false);
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw, boolean force) {
+ FlowPusherThread proc = getProcess(sw);
+
+ if (force) {
+ synchronized (proc.queues) {
+ SwitchQueue queue = proc.queues.remove(sw);
+ if (queue == null) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return false;
+ }
+ synchronized (queue) {
+ queue.toBeDeleted = true;
+ }
+ return true;
+ }
+ }
/**
* Add OFMessage to queue of the switch.
@@ -368,12 +418,10 @@
FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
+ // create queue at first call
if (queue == null) {
- queue = new SwitchQueue();
- queue.state = QueueState.READY;
- synchronized (proc) {
- proc.queues.put(sw, queue);
- }
+ createQueue(sw);
+ queue = getQueue(sw);
}
synchronized (queue) {
@@ -705,7 +753,7 @@
* @param sw Switch to which message is sent.
* @param flowPath FlowPath.
* @param flowEntry FlowEntry.
- * @return true if secceed.
+ * @return true if succeed.
*/
@Override
public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
@@ -1061,12 +1109,16 @@
barrierFutures.put(sw.getId(), map);
}
map.put(msg.getXid(), future);
- log.debug("Inserted future for {}", msg.getXid());
}
return future;
}
+ /**
+ * Get a queue attached to a switch.
+ * @param sw Switch object
+ * @return Queue object
+ */
protected SwitchQueue getQueue(IOFSwitch sw) {
if (sw == null) {
return null;
@@ -1075,12 +1127,22 @@
return getProcess(sw).queues.get(sw);
}
+ /**
+ * Get a hash value correspondent to a switch.
+ * @param sw Switch object
+ * @return Hash value
+ */
protected long getHash(IOFSwitch sw) {
// This code assumes DPID is sequentially assigned.
// TODO consider equalization algorithm
return sw.getId() % number_thread;
}
-
+
+ /**
+ * Get a Thread object which processes the queue attached to a switch.
+ * @param sw Switch object
+ * @return Thread object
+ */
protected FlowPusherThread getProcess(IOFSwitch sw) {
long hash = getHash(sw);
@@ -1119,4 +1181,5 @@
return Command.CONTINUE;
}
+
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 7bda71f..3250eb4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -28,6 +28,15 @@
import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
+/**
+ * FlowSynchronizer is a sub-module of FlowProgrammer to keep switches' flow table
+ * synchronized with GraphDB. FlowSynchronizer periodically read flow tables from
+ * switches and compare them with GraphDB to drop unnecessary flows and/or to install
+ * missing flows. FlowSynchronizer also watch the event of addition/deletion of switches
+ * and start synchronization.
+ * @author Brian
+ *
+ */
public class FlowSynchronizer implements IFlowSyncService {
private static Logger log = LoggerFactory.getLogger(FlowSynchronizer.class);
@@ -61,7 +70,12 @@
pusher = pusherService;
}
- protected class Synchronizer implements Runnable {
+ /**
+ * Synchronizer represents main thread of synchronization.
+ * @author Brian
+ *
+ */
+ protected class Synchronizer implements Runnable {
IOFSwitch sw;
ISwitchObject swObj;
@@ -81,6 +95,12 @@
//pusher.resume(sw);
}
+ /**
+ * Compare flows entries in GraphDB and switch to pick up necessary messages.
+ * After picking up, picked messages are added to FlowPusher.
+ * @param graphEntries Flow entries in GraphDB.
+ * @param switchEntries Flow entries in switch.
+ */
private void compare(Set<FlowEntryWrapper> graphEntries, Set<FlowEntryWrapper> switchEntries) {
int added = 0, removed = 0, skipped = 0;
for(FlowEntryWrapper entry : switchEntries) {
@@ -104,6 +124,10 @@
"Flow entries skipped " + skipped);
}
+ /**
+ * Read GraphDB to get FlowEntries associated with a switch.
+ * @return set of FlowEntries
+ */
private Set<FlowEntryWrapper> getFlowEntriesFromGraph() {
Set<FlowEntryWrapper> entries = new HashSet<FlowEntryWrapper>();
for(IFlowEntry entry : swObj.getFlowEntries()) {
@@ -113,6 +137,10 @@
return entries;
}
+ /**
+ * Read flow table from switch and derive FlowEntries from table.
+ * @return set of FlowEntries
+ */
private Set<FlowEntryWrapper> getFlowEntriesFromSwitch() {
int lengthU = 0;
@@ -159,6 +187,12 @@
}
+ /**
+ * FlowEntryWrapper represents abstract FlowEntry which is embodied by IFlowEntry
+ * (from GraphDB) or OFFlowStatisticsReply (from switch).
+ * @author Brian
+ *
+ */
class FlowEntryWrapper {
FlowEntryId id;
IFlowEntry iflowEntry;
@@ -174,6 +208,10 @@
id = new FlowEntryId(entry.getCookie());
}
+ /**
+ * Install this FlowEntry to a switch via FlowPusher.
+ * @param sw
+ */
public void addToSwitch(IOFSwitch sw) {
if(iflowEntry != null) {
pusher.add(sw, iflowEntry.getFlow(), iflowEntry);
@@ -184,6 +222,10 @@
}
}
+ /**
+ * Remove this FlowEntry from a switch via FlowPusher.
+ * @param sw
+ */
public void removeFromSwitch(IOFSwitch sw){
if(iflowEntry != null) {
log.error("Removing non-existent flow entry {} from sw {}",
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index 94d6e35..570120f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -13,6 +13,31 @@
public interface IFlowPusherService extends IFloodlightService {
/**
+ * Create a queue correspondent to the switch.
+ * @param sw Switch to which new queue is attached.
+ * @return true if new queue is successfully created.
+ */
+ boolean createQueue(IOFSwitch sw);
+
+ /**
+ * Delete a queue correspondent to the switch.
+ * Messages remains in queue will be all sent before queue is deleted.
+ * @param sw Switch of which queue is deleted.
+ * @return true if queue is successfully deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw);
+
+ /**
+ * Delete a queue correspondent to the switch.
+ * By setting force flag on, queue will be deleted immediately.
+ * @param sw Switch of which queue is deleted.
+ * @param force If this flag is on, queue will be deleted immediately
+ * regardless of any messages in the queue.
+ * @return true if queue is successfully deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw, boolean force);
+
+ /**
* Add a message to the queue of the switch.
* @param sw Switch to which message is pushed.
* @param msg Message object to be added.
@@ -39,6 +64,13 @@
boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
/**
+ * Set sending rate to a switch.
+ * @param sw Switch.
+ * @param rate Rate in bytes/ms.
+ */
+ public void setRate(IOFSwitch sw, long rate);
+
+ /**
* Add BARRIER message to queue and wait for reply.
* @param sw Switch to which barrier message is pushed.
* @return BARRIER_REPLY message sent from switch.