Added barrier function to FlowPusher.
Fixed divided by zero in FlowPusher.
Replaced Runnable in FlowPusher by Thread.
Replaced Thread.sleep() in FlowPusher by Semaphore.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index ebc86e4..b567a87 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -5,15 +5,20 @@
import java.util.HashMap;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.threadpool.IThreadPoolService;
public class FlowProgrammer implements IFloodlightModule {
- private static final boolean enableFlowSync = true;
+ @SuppressWarnings("unused")
+ private final static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
+
+ private static final boolean enableFlowSync = false;
protected volatile IFloodlightProviderService floodlightProvider;
@@ -79,5 +84,4 @@
return l;
}
-
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 6379f26..f43a83e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
import org.openflow.protocol.*;
import org.openflow.protocol.action.*;
@@ -47,6 +48,8 @@
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+ private static boolean barrierIfEmpty = false;
+
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
@@ -96,11 +99,20 @@
return true;
}
+ if (current == last_sent_time) {
+ return false;
+ }
+
// Check if sufficient time (from aspect of rate) elapsed or not.
long rate = last_sent_size / (current - last_sent_time);
return (rate < max_rate);
}
+ /**
+ * Log time and size of last sent data.
+ * @param current Time to be sent.
+ * @param size Size of sent data (in bytes).
+ */
void logSentData(long current, long size) {
last_sent_time = current;
last_sent_size = size;
@@ -113,7 +125,7 @@
private FloodlightContext context = null;
private BasicFactory factory = null;
- private Map<Long, FlowPusherProcess> threadMap = null;
+ private Map<Long, FlowPusherThread> threadMap = null;
private Map<Long, Map<Integer, OFBarrierReplyFuture>>
barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
@@ -124,26 +136,31 @@
* @author Naoki Shiota
*
*/
- private class FlowPusherProcess implements Runnable {
+ private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
= new HashMap<IOFSwitch,SwitchQueue>();
- private boolean isStopped = false;
- private boolean isMsgAdded = false;
+ private Semaphore mutex = new Semaphore(0);
@Override
public void run() {
log.debug("Begin Flow Pusher Process");
while (true) {
+ try {
+ // wait for message pushed to queue
+ mutex.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.debug("FlowPusherThread is interrupted");
+ return;
+ }
+
Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
synchronized (queues) {
entries = queues.entrySet();
}
- // Set taint flag to false at this moment.
- isMsgAdded = false;
-
for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
IOFSwitch sw = entry.getKey();
SwitchQueue queue = entry.getValue();
@@ -163,15 +180,14 @@
int i = 0;
while (! queue.isEmpty()) {
// Number of messages excess the limit
- if (++i >= MAX_MESSAGE_SEND) {
+ if (i >= MAX_MESSAGE_SEND) {
// Messages remains in queue
- isMsgAdded = true;
+ mutex.release();
break;
}
+ ++i;
OFMessage msg = queue.poll();
-
- // if need to send, call IOFSwitch#write()
try {
messageDamper.write(sw, msg, context);
log.debug("Pusher sends message : {}", msg);
@@ -183,27 +199,13 @@
}
sw.flush();
queue.logSentData(current_time, size);
+
+ if (queue.isEmpty() && barrierIfEmpty) {
+ barrier(sw);
+ }
}
}
}
-
- // sleep while all queues are empty
- while (! (isMsgAdded || isStopped)) {
- try {
- Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
- } catch (InterruptedException e) {
- e.printStackTrace();
- log.error("Thread.sleep failed");
- }
- }
-
- log.debug("Exit sleep loop.");
-
- if (isStopped) {
- log.debug("Pusher Process finished.");
- return;
- }
-
}
}
}
@@ -261,12 +263,11 @@
return;
}
- threadMap = new HashMap<Long,FlowPusherProcess>();
+ threadMap = new HashMap<Long,FlowPusherThread>();
for (long i = 0; i < number_thread; ++i) {
- FlowPusherProcess runnable = new FlowPusherProcess();
- threadMap.put(i, runnable);
+ FlowPusherThread thread = new FlowPusherThread();
- Thread thread = new Thread(runnable);
+ threadMap.put(i, thread);
thread.start();
}
}
@@ -335,10 +336,8 @@
return;
}
- for (FlowPusherProcess runnable : threadMap.values()) {
- if (! runnable.isStopped) {
- runnable.isStopped = true;
- }
+ for (FlowPusherThread t : threadMap.values()) {
+ t.interrupt();
}
}
@@ -366,7 +365,7 @@
*/
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
- FlowPusherProcess proc = getProcess(sw);
+ FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
if (queue == null) {
@@ -382,8 +381,10 @@
log.debug("Message is pushed : {}", msg);
}
- proc.isMsgAdded = true;
-
+ if (proc.mutex.availablePermits() == 0) {
+ proc.mutex.release();
+ }
+
return true;
}
@@ -1041,14 +1042,18 @@
@Override
public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
// TODO creation of message and future should be moved to OFSwitchImpl
+
+ if (sw == null) {
+ return null;
+ }
- OFMessage msg = factory.getMessage(OFType.BARRIER_REQUEST);
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
msg.setXid(sw.getNextTransactionId());
add(sw, msg);
// TODO create Future object of message
OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
-
+
synchronized (barrierFutures) {
Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
if (map == null) {
@@ -1056,7 +1061,7 @@
barrierFutures.put(sw.getId(), map);
}
map.put(msg.getXid(), future);
- log.debug("Created future for {}", msg.getXid());
+ log.debug("Inserted future for {}", msg.getXid());
}
return future;
@@ -1076,7 +1081,7 @@
return sw.getId() % number_thread;
}
- protected FlowPusherProcess getProcess(IOFSwitch sw) {
+ protected FlowPusherThread getProcess(IOFSwitch sw) {
long hash = getHash(sw);
return threadMap.get(hash);
@@ -1099,11 +1104,6 @@
@Override
public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
- // This check can be skipped (since Controller must filter).
-// if (msg.getType() != OFType.BARRIER_REPLY) {
-// return Command.CONTINUE;
-// }
-
Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
if (map == null) {
return Command.CONTINUE;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index b2e4552..f3f5c50 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -2,7 +2,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -13,23 +12,8 @@
import org.openflow.protocol.OFFlowMod;
import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPacketOut;
import org.openflow.protocol.OFPort;
import org.openflow.protocol.OFStatisticsRequest;
-import org.openflow.protocol.action.OFAction;
-import org.openflow.protocol.action.OFActionDataLayerDestination;
-import org.openflow.protocol.action.OFActionDataLayerSource;
-import org.openflow.protocol.action.OFActionEnqueue;
-import org.openflow.protocol.action.OFActionNetworkLayerDestination;
-import org.openflow.protocol.action.OFActionNetworkLayerSource;
-import org.openflow.protocol.action.OFActionNetworkTypeOfService;
-import org.openflow.protocol.action.OFActionOutput;
-import org.openflow.protocol.action.OFActionStripVirtualLan;
-import org.openflow.protocol.action.OFActionTransportLayerDestination;
-import org.openflow.protocol.action.OFActionTransportLayerSource;
-import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
-import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
import org.openflow.protocol.statistics.OFFlowStatisticsReply;
import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
import org.openflow.protocol.statistics.OFStatistics;
@@ -37,36 +21,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.tinkerpop.blueprints.Direction;
-
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.IOFSwitchListener;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
-import net.floodlightcontroller.core.module.IFloodlightModule;
-import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.restserver.IRestApiService;
-import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.graph.GraphDBOperation;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
-import net.onrc.onos.ofcontroller.core.module.IOnosService;
-import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
import net.onrc.onos.ofcontroller.util.Dpid;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction;
-import net.onrc.onos.ofcontroller.util.FlowEntryActions;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionEnqueue;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionOutput;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetEthernetAddr;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIPv4Addr;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIpToS;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetTcpUdpPort;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanId;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanPriority;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionStripVlan;
import net.onrc.onos.registry.controller.IControllerRegistryService;
public class FlowSynchronizer implements IFlowSyncService, IOFSwitchListener {
@@ -86,8 +50,8 @@
public void synchronize(IOFSwitch sw) {
Synchroizer sync = new Synchroizer(sw);
Thread t = new Thread(sync);
- t.start();
switchThread.put(sw, t);
+ t.start();
}
@Override
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
new file mode 100644
index 0000000..3013f5a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
@@ -0,0 +1,49 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import java.util.concurrent.TimeUnit;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+public class OFBarrierReplyFuture extends OFMessageFuture<OFBarrierReply> {
+
+ protected volatile boolean finished;
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId);
+ init();
+ }
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId, timeout, unit);
+ init();
+ }
+
+ private void init() {
+ this.finished = false;
+ this.result = null;
+ }
+
+ @Override
+ protected void handleReply(IOFSwitch sw, OFMessage msg) {
+ this.result = (OFBarrierReply) msg;
+ this.finished = true;
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ protected void unRegister() {
+ super.unRegister();
+ }
+}