Merge branch 'master' of https://github.com/OPENNETWORKINGLAB/ONOS
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index 45f59b3..33c9a6a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -22,21 +22,34 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.registry.controller.IControllerRegistryService;
+/**
+ * FlowProgrammer is a module responsible to maintain flows installed to switches.
+ * FlowProgrammer consists of FlowPusher and FlowSynchronizer.
+ * FlowPusher manages the rate of installation, and FlowSynchronizer synchronizes
+ * flows between GraphDB and switches.
+ * FlowProgrammer also watch the event of addition/deletion of switches to
+ * start/stop synchronization. When a switch is added to network, FlowProgrammer
+ * immediately kicks synchronization to keep switch's flow table latest state.
+ * Adversely, when a switch is removed from network, FlowProgrammer immediately
+ * stops synchronization.
+ * @author Brian
+ *
+ */
public class FlowProgrammer implements IFloodlightModule,
IOFMessageListener,
IOFSwitchListener {
@SuppressWarnings("unused")
+ // flag to enable FlowSynchronizer
private static final boolean enableFlowSync = false;
protected static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
protected volatile IFloodlightProviderService floodlightProvider;
protected volatile IControllerRegistryService registryService;
-
protected FlowPusher pusher;
private static final int NUM_PUSHER_THREAD = 1;
protected FlowSynchronizer synchronizer;
-
+
public FlowProgrammer() {
pusher = new FlowPusher(NUM_PUSHER_THREAD);
if (enableFlowSync) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index f43a83e..6f85c5d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -40,16 +40,20 @@
import net.onrc.onos.ofcontroller.util.Port;
/**
- * FlowPusher intermediates FlowManager/FlowSynchronizer and switches to push OpenFlow
- * messages to switches in proper rate.
+ * FlowPusher is a implementation of FlowPusherService.
+ * FlowPusher assigns one message queue instance for each one switch.
+ * Number of message processing threads is configurable by constructor, and
+ * one thread can handle multiple message queues. Each queue will be assigned to
+ * a thread according to hash function defined by getHash().
+ * Each processing thread reads messages from queues and sends it to switches
+ * in round-robin. Processing thread also calculates rate of sending to suppress
+ * excessive message sending.
* @author Naoki Shiota
*
*/
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
- private static boolean barrierIfEmpty = false;
-
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
@@ -57,10 +61,6 @@
protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
- // Interval of sleep when queue is empty
- protected static final long SLEEP_MILLI_SEC = 10;
- protected static final int SLEEP_NANO_SEC = 0;
-
// Number of messages sent to switch at once
protected static final int MAX_MESSAGE_SEND = 100;
@@ -74,7 +74,7 @@
}
/**
- * Message queue attached to a switch.
+ * SwitchQueue represents message queue attached to a switch.
* This consists of queue itself and variables used for limiting sending rate.
* @author Naoki Shiota
*
@@ -83,11 +83,14 @@
private class SwitchQueue extends ArrayDeque<OFMessage> {
QueueState state;
- // Max rate of sending message (bytes/sec). 0 implies no limitation.
+ // Max rate of sending message (bytes/ms). 0 implies no limitation.
long max_rate = 0; // 0 indicates no limitation
long last_sent_time = 0;
long last_sent_size = 0;
+ // "To be deleted" flag
+ boolean toBeDeleted = false;
+
/**
* Check if sending rate is within the rate
* @param current Current time
@@ -125,7 +128,10 @@
private FloodlightContext context = null;
private BasicFactory factory = null;
+
+ // Map of threads versus dpid
private Map<Long, FlowPusherThread> threadMap = null;
+ // Map of Future objects versus dpid and transaction ID.
private Map<Long, Map<Integer, OFBarrierReplyFuture>>
barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
@@ -138,14 +144,12 @@
*/
private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
- = new HashMap<IOFSwitch,SwitchQueue>();
+ = new HashMap<IOFSwitch,SwitchQueue>();
private Semaphore mutex = new Semaphore(0);
@Override
public void run() {
- log.debug("Begin Flow Pusher Process");
-
while (true) {
try {
// wait for message pushed to queue
@@ -172,7 +176,7 @@
}
// check sending rate and determine it to be sent or not
- long current_time = System.nanoTime();
+ long current_time = System.currentTimeMillis();
long size = 0;
synchronized (queue) {
@@ -200,8 +204,13 @@
sw.flush();
queue.logSentData(current_time, size);
- if (queue.isEmpty() && barrierIfEmpty) {
- barrier(sw);
+ if (queue.isEmpty()) {
+ // remove queue if flagged to be.
+ if (queue.toBeDeleted) {
+ synchronized (queues) {
+ queues.remove(sw);
+ }
+ }
}
}
}
@@ -247,7 +256,7 @@
if (damper != null) {
messageDamper = damper;
} else {
- // use default value
+ // use default values
messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
EnumSet.of(OFType.FLOW_MOD),
OFMESSAGE_DAMPER_TIMEOUT);
@@ -272,10 +281,6 @@
}
}
- /**
- * Suspend sending messages to switch.
- * @param sw
- */
@Override
public boolean suspend(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -293,9 +298,6 @@
}
}
- /**
- * Resume sending messages to switch.
- */
@Override
public boolean resume(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -313,9 +315,6 @@
}
}
- /**
- * Check if given switch is suspended.
- */
@Override
public boolean isSuspended(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
@@ -341,11 +340,7 @@
}
}
- /**
- * Set sending rate to a switch.
- * @param sw Switch.
- * @param rate Rate in bytes/sec.
- */
+ @Override
public void setRate(IOFSwitch sw, long rate) {
SwitchQueue queue = getQueue(sw);
if (queue == null) {
@@ -356,24 +351,62 @@
queue.max_rate = rate;
}
}
+
+ @Override
+ public boolean createQueue(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue != null) {
+ return false;
+ }
+
+ FlowPusherThread proc = getProcess(sw);
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ synchronized (proc) {
+ proc.queues.put(sw, queue);
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw) {
+ return deleteQueue(sw, false);
+ }
- /**
- * Add OFMessage to queue of the switch.
- * @param sw Switch to which message is sent.
- * @param msg Message to be sent.
- * @return true if succeed.
- */
+ @Override
+ public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
+ FlowPusherThread proc = getProcess(sw);
+
+ if (forceStop) {
+ synchronized (proc.queues) {
+ SwitchQueue queue = proc.queues.remove(sw);
+ if (queue == null) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return false;
+ }
+ synchronized (queue) {
+ queue.toBeDeleted = true;
+ }
+ return true;
+ }
+ }
+
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
+ // create queue at first addition of message
if (queue == null) {
- queue = new SwitchQueue();
- queue.state = QueueState.READY;
- synchronized (proc) {
- proc.queues.put(sw, queue);
- }
+ createQueue(sw);
+ queue = getQueue(sw);
}
synchronized (queue) {
@@ -388,13 +421,6 @@
return true;
}
- /**
- * Create OFMessage from given flow information and add it to the queue.
- * @param sw Switch to which message is sent.
- * @param flowObj FlowPath.
- * @param flowEntryObj FlowEntry.
- * @return true if succeed.
- */
@Override
public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
log.debug("sending : {}, {}", sw, flowObj);
@@ -700,13 +726,6 @@
return true;
}
- /**
- * Create OFMessage from given flow information and add it to the queue.
- * @param sw Switch to which message is sent.
- * @param flowPath FlowPath.
- * @param flowEntry FlowEntry.
- * @return true if secceed.
- */
@Override
public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
//
@@ -1061,12 +1080,16 @@
barrierFutures.put(sw.getId(), map);
}
map.put(msg.getXid(), future);
- log.debug("Inserted future for {}", msg.getXid());
}
return future;
}
+ /**
+ * Get a queue attached to a switch.
+ * @param sw Switch object
+ * @return Queue object
+ */
protected SwitchQueue getQueue(IOFSwitch sw) {
if (sw == null) {
return null;
@@ -1075,12 +1098,22 @@
return getProcess(sw).queues.get(sw);
}
+ /**
+ * Get a hash value correspondent to a switch.
+ * @param sw Switch object
+ * @return Hash value
+ */
protected long getHash(IOFSwitch sw) {
// This code assumes DPID is sequentially assigned.
// TODO consider equalization algorithm
return sw.getId() % number_thread;
}
-
+
+ /**
+ * Get a Thread object which processes the queue attached to a switch.
+ * @param sw Switch object
+ * @return Thread object
+ */
protected FlowPusherThread getProcess(IOFSwitch sw) {
long hash = getHash(sw);
@@ -1119,4 +1152,5 @@
return Command.CONTINUE;
}
+
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 7bda71f..0d6b0e8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -28,6 +28,14 @@
import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
+/**
+ * FlowSynchronizer is an implementation of FlowSyncService.
+ * In addition to IFlowSyncService, FlowSynchronizer periodically reads flow tables from
+ * switches and compare them with GraphDB to drop unnecessary flows and/or to install
+ * missing flows.
+ * @author Brian
+ *
+ */
public class FlowSynchronizer implements IFlowSyncService {
private static Logger log = LoggerFactory.getLogger(FlowSynchronizer.class);
@@ -57,11 +65,20 @@
}
}
+ /**
+ * Initialize Synchronizer.
+ * @param pusherService FlowPusherService used for sending messages.
+ */
public void init(IFlowPusherService pusherService) {
pusher = pusherService;
}
- protected class Synchronizer implements Runnable {
+ /**
+ * Synchronizer represents main thread of synchronization.
+ * @author Brian
+ *
+ */
+ protected class Synchronizer implements Runnable {
IOFSwitch sw;
ISwitchObject swObj;
@@ -81,6 +98,12 @@
//pusher.resume(sw);
}
+ /**
+ * Compare flows entries in GraphDB and switch to pick up necessary messages.
+ * After picking up, picked messages are added to FlowPusher.
+ * @param graphEntries Flow entries in GraphDB.
+ * @param switchEntries Flow entries in switch.
+ */
private void compare(Set<FlowEntryWrapper> graphEntries, Set<FlowEntryWrapper> switchEntries) {
int added = 0, removed = 0, skipped = 0;
for(FlowEntryWrapper entry : switchEntries) {
@@ -104,6 +127,10 @@
"Flow entries skipped " + skipped);
}
+ /**
+ * Read GraphDB to get FlowEntries associated with a switch.
+ * @return set of FlowEntries
+ */
private Set<FlowEntryWrapper> getFlowEntriesFromGraph() {
Set<FlowEntryWrapper> entries = new HashSet<FlowEntryWrapper>();
for(IFlowEntry entry : swObj.getFlowEntries()) {
@@ -113,6 +140,10 @@
return entries;
}
+ /**
+ * Read flow table from switch and derive FlowEntries from table.
+ * @return set of FlowEntries
+ */
private Set<FlowEntryWrapper> getFlowEntriesFromSwitch() {
int lengthU = 0;
@@ -159,6 +190,12 @@
}
+ /**
+ * FlowEntryWrapper represents abstract FlowEntry which is embodied by IFlowEntry
+ * (from GraphDB) or OFFlowStatisticsReply (from switch).
+ * @author Brian
+ *
+ */
class FlowEntryWrapper {
FlowEntryId id;
IFlowEntry iflowEntry;
@@ -174,6 +211,10 @@
id = new FlowEntryId(entry.getCookie());
}
+ /**
+ * Install this FlowEntry to a switch via FlowPusher.
+ * @param sw Switch to which flow will be installed.
+ */
public void addToSwitch(IOFSwitch sw) {
if(iflowEntry != null) {
pusher.add(sw, iflowEntry.getFlow(), iflowEntry);
@@ -184,6 +225,10 @@
}
}
+ /**
+ * Remove this FlowEntry from a switch via FlowPusher.
+ * @param sw Switch from which flow will be removed.
+ */
public void removeFromSwitch(IOFSwitch sw){
if(iflowEntry != null) {
log.error("Removing non-existent flow entry {} from sw {}",
@@ -216,6 +261,7 @@
* the same value; otherwise, returns false.
*
* @param Object to compare
+ * @return true if the object has the same Flow Entry ID.
*/
@Override
public boolean equals(Object obj){
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index 94d6e35..20a6249 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -11,8 +11,44 @@
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowPath;
+/**
+ * FlowPusherService is a service to send message to switches in proper rate.
+ * Conceptually a queue is attached to each switch, and FlowPusherService
+ * read a message from queue and send it to switch in order.
+ * To guarantee message has been installed, FlowPusherService can add barrier
+ * message to queue and can notify when barrier message is sent to switch.
+ * @author Naoki Shiota
+ *
+ */
public interface IFlowPusherService extends IFloodlightService {
/**
+ * Create a queue correspondent to the switch.
+ * @param sw Switch to which new queue is attached.
+ * @return true if new queue is successfully created.
+ */
+ boolean createQueue(IOFSwitch sw);
+
+ /**
+ * Delete a queue correspondent to the switch.
+ * Messages remains in queue will be all sent before queue is deleted.
+ * @param sw Switch of which queue is deleted.
+ * @return true if queue is successfully deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw);
+
+ /**
+ * Delete a queue correspondent to the switch.
+ * By setting force flag on, queue will be deleted immediately.
+ * @param sw Switch of which queue is deleted.
+ * @param forceStop If this flag is set to true, queue will be deleted
+ * immediately regardless of any messages in the queue.
+ * If false, all messages will be sent to switch and queue will
+ * be deleted after that.
+ * @return true if queue is successfully deleted or flagged to be deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw, boolean forceStop);
+
+ /**
* Add a message to the queue of the switch.
* @param sw Switch to which message is pushed.
* @param msg Message object to be added.
@@ -39,6 +75,13 @@
boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
/**
+ * Set sending rate to a switch.
+ * @param sw Switch.
+ * @param rate Rate in bytes/ms.
+ */
+ public void setRate(IOFSwitch sw, long rate);
+
+ /**
* Add BARRIER message to queue and wait for reply.
* @param sw Switch to which barrier message is pushed.
* @return BARRIER_REPLY message sent from switch.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
index 2b9d51d..4e6efaf 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
@@ -5,7 +5,9 @@
import net.floodlightcontroller.core.module.IFloodlightService;
/**
- * @author bocon
+ * FlowSyncService is a service to synchronize GraphDB and switch's flow table.
+ * FlowSyncService offers APIs to trigger and interrupt synchronization explicitly.
+ * @author Brian
*
*/
public interface IFlowSyncService extends IFloodlightService {