Modified FlowManager to use FlowPusher instead of OFMessageDamper and FlowSwitchOperation.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index ecbb49c..42f5e3c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -18,7 +18,6 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
-import net.floodlightcontroller.util.OFMessageDamper;
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.graph.GraphDBOperation;
@@ -32,7 +31,6 @@
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.*;
-import org.openflow.protocol.OFType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,15 +48,8 @@
protected FloodlightModuleContext context;
protected PathComputation pathComputation;
- protected OFMessageDamper messageDamper;
-
- //
- // TODO: Values copied from elsewhere (class LearningSwitch).
- // The local copy should go away!
- //
- protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
- protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
-
+ protected FlowPusher pusher;
+
// Flow Entry ID generation state
private static Random randomGenerator = new Random();
private static int nextFlowEntryIdPrefix = 0;
@@ -387,6 +378,7 @@
public void close() {
datagridService.deregisterPathComputationService(pathComputation);
dbHandler.close();
+ pusher.stop();
}
/**
@@ -448,15 +440,15 @@
topologyNetService = context.getServiceImpl(ITopologyNetService.class);
datagridService = context.getServiceImpl(IDatagridService.class);
restApi = context.getServiceImpl(IRestApiService.class);
-
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
- EnumSet.of(OFType.FLOW_MOD),
- OFMESSAGE_DAMPER_TIMEOUT);
-
+
+ pusher = new FlowPusher();
+ pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
this.init("");
mapReaderScheduler = Executors.newScheduledThreadPool(1);
shortestPathReconcileScheduler = Executors.newScheduledThreadPool(1);
+
+ pusher.start();
}
/**
@@ -787,9 +779,8 @@
*/
private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
IFlowEntry flowEntryObj) {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowObj, flowEntryObj);
+ log.debug("Flow is sent to pusher : dpid({}) flow_id({})", mySwitch.getId(), flowEntryObj.getFlowEntryId());
+ return pusher.send(mySwitch, flowObj, flowEntryObj);
}
/**
@@ -802,9 +793,14 @@
*/
private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
FlowEntry flowEntry) {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowPath, flowEntry);
+ log.debug("Flow is sent to pusher : dpid({}) flow_id({})", mySwitch.getId(), flowEntry.getFlowId());
+ // TODO handle this installation by FlowPusher
+
+// return FlowSwitchOperation.installFlowEntry(
+// floodlightProvider.getOFMessageFactory(),
+// messageDamper, mySwitch, flowPath, flowEntry);
+
+ return true;
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index 6c7a2bb..2511a3f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -7,6 +7,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.openflow.protocol.*;
import org.openflow.protocol.action.*;
@@ -39,10 +40,15 @@
//
protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
+
+ protected static final long SLEEP_MILLI_SEC = 3;
+ protected static final int SLEEP_NANO_SEC = 0;
public static final short PRIORITY_DEFAULT = 100;
public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
+
+
public enum QueueState {
READY,
@@ -53,7 +59,7 @@
QueueState state;
// Max rate of sending message (bytes/sec). 0 implies no limitation.
- long max_rate = 0;
+ long max_rate = Long.MAX_VALUE;
long last_sent_time = 0;
long last_sent_size = 0;
@@ -88,13 +94,25 @@
private BasicFactory factory = null;
private Thread thread = null;
+ private boolean isStopped = false;
+ private boolean isMsgAdded = false;
private class FlowPusherProcess implements Runnable {
+
@Override
public void run() {
+ log.debug("Begin Flow Pusher Process");
while (true) {
- for (Map.Entry<IOFSwitch,SwitchQueue> entry : queues.entrySet()) {
+ Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
+ synchronized (queues) {
+ entries = queues.entrySet();
+ }
+
+ // Set taint flag to false at this moment.
+ isMsgAdded = false;
+
+ for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
IOFSwitch sw = entry.getKey();
SwitchQueue queue = entry.getValue();
@@ -105,18 +123,21 @@
}
synchronized (queue) {
- // TODO send multiple messages at once
+ log.debug("Queue size : {}", queue.size());
- while (! queue.isEmpty()) {
- OFMessage msg = queue.poll();
- // check sending rate and determine it to be sent or not
- long current_time = System.nanoTime();
-
- // if need to send, call IOFSwitch#write()
- if (queue.isSendable(current_time)) {
+ // check sending rate and determine it to be sent or not
+ long current_time = System.nanoTime();
+
+ if (queue.isSendable(current_time)) {
+ // TODO send multiple messages at once
+ while (! queue.isEmpty()) {
+ OFMessage msg = queue.poll();
+
+ // if need to send, call IOFSwitch#write()
try {
messageDamper.write(sw, msg, context);
queue.updateRate(current_time, msg);
+ log.debug("Pusher sends message : {}", msg);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -127,28 +148,49 @@
}
// sleep while all queues are empty
- boolean sleep = true;
- do {
+ while (! isMsgAdded) {
+ if (isStopped) {
+ log.debug("Pusher Process finished.");
+ return;
+ }
- // TODO check if queues are empty
- } while (sleep);
+ try {
+ Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ log.debug("Exit sleep loop.");
+
+ if (isStopped) {
+ log.debug("Pusher Process finished.");
+ return;
+ }
}
}
}
- public void init(FloodlightContext context, BasicFactory factory) {
+ public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
this.context = context;
this.factory = factory;
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
- EnumSet.of(OFType.FLOW_MOD),
- OFMESSAGE_DAMPER_TIMEOUT);
+
+ if (damper != null) {
+ messageDamper = damper;
+ } else {
+ // use default value
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);
+ }
}
/**
* Begin processing queue.
*/
public void start() {
- if (context == null || factory == null) {
- // not yet initialized
+ if (factory == null) {
+ log.error("FlowPusher not yet initialized.");
return;
}
@@ -196,7 +238,18 @@
*/
public void stop() {
if (thread != null && thread.isAlive()) {
- // TODO tell thread to halt
+ isStopped = true;
+ }
+ }
+
+ public void setRate(IOFSwitch sw, long rate) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return;
+ }
+
+ if (rate > 0) {
+ queue.max_rate = rate;
}
}
@@ -208,13 +261,19 @@
public boolean send(IOFSwitch sw, OFMessage msg) {
SwitchQueue queue = getQueue(sw);
if (queue == null) {
- queues.put(sw, new SwitchQueue());
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ synchronized (queues) {
+ queues.put(sw, queue);
+ }
}
synchronized (queue) {
queue.add(msg);
}
+ isMsgAdded = true;
+
return true;
}