Finished implementing FlowPusherService.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 42f5e3c..a3f602f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -18,7 +18,6 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
-
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.graph.GraphDBOperation;
import net.onrc.onos.ofcontroller.core.INetMapStorage;
@@ -31,13 +30,15 @@
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.*;
+import org.openflow.protocol.OFMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Flow Manager class for handling the network flows.
*/
-public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
+public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage,
+ IFlowPusherService {
protected GraphDBOperation dbHandler;
@@ -106,6 +107,8 @@
Iterable<IFlowEntry> allFlowEntries =
dbHandler.getAllSwitchNotUpdatedFlowEntries();
for (IFlowEntry flowEntryObj : allFlowEntries) {
+ log.debug("flowEntryobj : {}", flowEntryObj);
+
counterAllFlowEntries++;
String dpidStr = flowEntryObj.getSwitchDpid();
@@ -140,6 +143,8 @@
}
counterMyNotUpdatedFlowEntries++;
}
+
+ log.debug("addFlowEntries : {}", addFlowEntries);
//
// Process the Flow Entries that need to be added
@@ -443,12 +448,11 @@
pusher = new FlowPusher();
pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
+
this.init("");
mapReaderScheduler = Executors.newScheduledThreadPool(1);
shortestPathReconcileScheduler = Executors.newScheduledThreadPool(1);
-
- pusher.start();
}
/**
@@ -486,6 +490,8 @@
// Initialize the Flow Entry ID generator
nextFlowEntryIdPrefix = randomGenerator.nextInt();
+ pusher.start();
+
//
// Create the Path Computation thread and register it with the
// Datagrid Service
@@ -779,7 +785,6 @@
*/
private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
IFlowEntry flowEntryObj) {
- log.debug("Flow is sent to pusher : dpid({}) flow_id({})", mySwitch.getId(), flowEntryObj.getFlowEntryId());
return pusher.send(mySwitch, flowObj, flowEntryObj);
}
@@ -907,4 +912,44 @@
dbHandler.commit();
}
+
+ @Override
+ public void addMessage(long dpid, OFMessage msg) {
+ IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
+ if (sw == null) {
+ return;
+ }
+
+ pusher.send(sw, msg);
+ }
+
+ @Override
+ public boolean suspend(long dpid) {
+ IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
+ if (sw == null) {
+ return false;
+ }
+
+ return pusher.suspend(sw);
+ }
+
+ @Override
+ public boolean resume(long dpid) {
+ IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
+ if (sw == null) {
+ return false;
+ }
+
+ return pusher.resume(sw);
+ }
+
+ @Override
+ public boolean isSuspended(long dpid) {
+ IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
+ if (sw == null) {
+ return false;
+ }
+
+ return pusher.isSuspended(sw);
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index 2511a3f..67d84e2 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -47,8 +47,6 @@
public static final short PRIORITY_DEFAULT = 100;
public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
-
-
public enum QueueState {
READY,
@@ -59,7 +57,7 @@
QueueState state;
// Max rate of sending message (bytes/sec). 0 implies no limitation.
- long max_rate = Long.MAX_VALUE;
+ long max_rate = 0; // 0 indicates no limitation
long last_sent_time = 0;
long last_sent_size = 0;
@@ -69,6 +67,11 @@
* @return true if within the rate
*/
boolean isSendable(long current) {
+ if (max_rate == 0) {
+ // no limitation
+ return true;
+ }
+
long rate = last_sent_size / (current - last_sent_time);
if (rate < max_rate) {
@@ -122,14 +125,11 @@
continue;
}
+ // check sending rate and determine it to be sent or not
+ long current_time = System.nanoTime();
+
synchronized (queue) {
- log.debug("Queue size : {}", queue.size());
-
- // check sending rate and determine it to be sent or not
- long current_time = System.nanoTime();
-
if (queue.isSendable(current_time)) {
- // TODO send multiple messages at once
while (! queue.isEmpty()) {
OFMessage msg = queue.poll();
@@ -159,6 +159,7 @@
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
+ log.error("Thread.sleep failed");
}
}
@@ -168,6 +169,7 @@
log.debug("Pusher Process finished.");
return;
}
+
}
}
}
@@ -202,36 +204,51 @@
* Suspend processing a queue related to given switch.
* @param sw
*/
- public void suspend(IOFSwitch sw) {
+ public boolean suspend(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
if (queue == null) {
- return;
+ return false;
}
synchronized (queue) {
if (queue.state == QueueState.READY) {
queue.state = QueueState.SUSPENDED;
+ return true;
}
+ return false;
}
}
/**
* Resume processing a queue related to given switch.
*/
- public void resume(IOFSwitch sw) {
+ public boolean resume(IOFSwitch sw) {
SwitchQueue queue = getQueue(sw);
if (queue == null) {
- return;
+ return false;
}
synchronized (queue) {
if (queue.state == QueueState.SUSPENDED) {
queue.state = QueueState.READY;
+ return true;
}
+ return false;
}
}
+
+ public boolean isSuspended(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ // TODO Is true suitable for this case?
+ return true;
+ }
+
+ return (queue.state == QueueState.SUSPENDED);
+ }
/**
* End processing queue and exit thread.
@@ -268,6 +285,8 @@
}
}
+ log.debug("Message is pushed : {}", msg);
+
synchronized (queue) {
queue.add(msg);
}
@@ -285,6 +304,7 @@
* @return
*/
public boolean send(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
+ log.debug("sending : {}, {}", sw, flowObj);
String flowEntryIdStr = flowEntryObj.getFlowEntryId();
if (flowEntryIdStr == null)
return false;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowPusherService.java
new file mode 100644
index 0000000..830d47f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowPusherService.java
@@ -0,0 +1,37 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.module.IFloodlightService;
+
+import org.openflow.protocol.OFMessage;
+
+public interface IFlowPusherService extends IFloodlightService {
+ /**
+ * Add a message to the queue of a switch.
+ * @param sw
+ * @param msg
+ * @return
+ */
+ void addMessage(long dpid, OFMessage msg);
+
+ /**
+ * Suspend pushing message to a switch.
+ * @param sw
+ * @return true if success
+ */
+ boolean suspend(long dpid);
+
+ /**
+ * Resume pushing message to a switch.
+ * @param sw
+ * @return true if success
+ */
+ boolean resume(long dpid);
+
+ /**
+ * Get whether pushing of message is suspended or not.
+ * @param sw
+ * @return true if suspended
+ */
+ boolean isSuspended(long dpid);
+}