Merge pull request #457 from n-shiota/syncdev17
Added barrier handling feature to FlowPusher
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index a59a9f9..b567a87 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -4,6 +4,10 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -11,6 +15,9 @@
import net.floodlightcontroller.core.module.IFloodlightService;
public class FlowProgrammer implements IFloodlightModule {
+ @SuppressWarnings("unused")
+ private final static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
+
private static final boolean enableFlowSync = false;
protected volatile IFloodlightProviderService floodlightProvider;
@@ -31,7 +38,7 @@
public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
- pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
+ pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
if (enableFlowSync) {
synchronizer.init(context);
}
@@ -77,5 +84,4 @@
return l;
}
-
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 532477a..f43a83e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -8,6 +8,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
import org.openflow.protocol.*;
import org.openflow.protocol.action.*;
@@ -16,7 +18,12 @@
import org.slf4j.LoggerFactory;
import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
@@ -38,9 +45,11 @@
* @author Naoki Shiota
*
*/
-public class FlowPusher implements IFlowPusherService {
+public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+ private static boolean barrierIfEmpty = false;
+
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
@@ -64,6 +73,12 @@
SUSPENDED,
}
+ /**
+ * Message queue attached to a switch.
+ * This consists of queue itself and variables used for limiting sending rate.
+ * @author Naoki Shiota
+ *
+ */
@SuppressWarnings("serial")
private class SwitchQueue extends ArrayDeque<OFMessage> {
QueueState state;
@@ -84,15 +99,20 @@
return true;
}
- long rate = last_sent_size / (current - last_sent_time);
-
- if (rate < max_rate) {
- return true;
- } else {
+ if (current == last_sent_time) {
return false;
}
+
+ // Check if sufficient time (from aspect of rate) elapsed or not.
+ long rate = last_sent_size / (current - last_sent_time);
+ return (rate < max_rate);
}
+ /**
+ * Log time and size of last sent data.
+ * @param current Time to be sent.
+ * @param size Size of sent data (in bytes).
+ */
void logSentData(long current, long size) {
last_sent_time = current;
last_sent_size = size;
@@ -100,11 +120,14 @@
}
- private OFMessageDamper messageDamper;
+ private OFMessageDamper messageDamper = null;
+ private IThreadPoolService threadPool = null;
private FloodlightContext context = null;
private BasicFactory factory = null;
- private Map<Long, FlowPusherProcess> threadMap = null;
+ private Map<Long, FlowPusherThread> threadMap = null;
+ private Map<Long, Map<Integer, OFBarrierReplyFuture>>
+ barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
private int number_thread = 1;
@@ -113,26 +136,31 @@
* @author Naoki Shiota
*
*/
- private class FlowPusherProcess implements Runnable {
+ private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
= new HashMap<IOFSwitch,SwitchQueue>();
- private boolean isStopped = false;
- private boolean isMsgAdded = false;
+ private Semaphore mutex = new Semaphore(0);
@Override
public void run() {
log.debug("Begin Flow Pusher Process");
while (true) {
+ try {
+ // wait for message pushed to queue
+ mutex.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.debug("FlowPusherThread is interrupted");
+ return;
+ }
+
Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
synchronized (queues) {
entries = queues.entrySet();
}
- // Set taint flag to false at this moment.
- isMsgAdded = false;
-
for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
IOFSwitch sw = entry.getKey();
SwitchQueue queue = entry.getValue();
@@ -152,15 +180,14 @@
int i = 0;
while (! queue.isEmpty()) {
// Number of messages excess the limit
- if (++i >= MAX_MESSAGE_SEND) {
+ if (i >= MAX_MESSAGE_SEND) {
// Messages remains in queue
- isMsgAdded = true;
+ mutex.release();
break;
}
+ ++i;
OFMessage msg = queue.poll();
-
- // if need to send, call IOFSwitch#write()
try {
messageDamper.write(sw, msg, context);
log.debug("Pusher sends message : {}", msg);
@@ -172,42 +199,50 @@
}
sw.flush();
queue.logSentData(current_time, size);
+
+ if (queue.isEmpty() && barrierIfEmpty) {
+ barrier(sw);
+ }
}
}
}
-
- // sleep while all queues are empty
- while (! (isMsgAdded || isStopped)) {
- try {
- Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
- } catch (InterruptedException e) {
- e.printStackTrace();
- log.error("Thread.sleep failed");
- }
- }
-
- log.debug("Exit sleep loop.");
-
- if (isStopped) {
- log.debug("Pusher Process finished.");
- return;
- }
-
}
}
}
+ /**
+ * Initialize object with one thread.
+ */
public FlowPusher() {
-
}
+ /**
+ * Initialize object with threads of given number.
+ * @param number_thread Number of threads to handle messages.
+ */
public FlowPusher(int number_thread) {
this.number_thread = number_thread;
}
- public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
+ /**
+ * Set parameters needed for sending messages.
+ * @param context FloodlightContext used for sending messages.
+ * If null, FlowPusher uses default context.
+ * @param modContext FloodlightModuleContext used for acquiring
+ * ThreadPoolService and registering MessageListener.
+ * @param factory Factory object to create OFMessage objects.
+ * @param damper Message damper used for sending messages.
+ * If null, FlowPusher creates its own damper object.
+ */
+ public void init(FloodlightContext context,
+ FloodlightModuleContext modContext,
+ BasicFactory factory,
+ OFMessageDamper damper) {
this.context = context;
this.factory = factory;
+ this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+ IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
+ flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
if (damper != null) {
messageDamper = damper;
@@ -228,12 +263,11 @@
return;
}
- threadMap = new HashMap<Long,FlowPusherProcess>();
+ threadMap = new HashMap<Long,FlowPusherThread>();
for (long i = 0; i < number_thread; ++i) {
- FlowPusherProcess runnable = new FlowPusherProcess();
- threadMap.put(i, runnable);
+ FlowPusherThread thread = new FlowPusherThread();
- Thread thread = new Thread(runnable);
+ threadMap.put(i, thread);
thread.start();
}
}
@@ -302,10 +336,8 @@
return;
}
- for (FlowPusherProcess runnable : threadMap.values()) {
- if (! runnable.isStopped) {
- runnable.isStopped = true;
- }
+ for (FlowPusherThread t : threadMap.values()) {
+ t.interrupt();
}
}
@@ -326,14 +358,14 @@
}
/**
- * Add OFMessage to the queue related to given switch.
+ * Add OFMessage to queue of the switch.
* @param sw Switch to which message is sent.
* @param msg Message to be sent.
* @return true if succeed.
*/
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
- FlowPusherProcess proc = getProcess(sw);
+ FlowPusherThread proc = getProcess(sw);
SwitchQueue queue = proc.queues.get(sw);
if (queue == null) {
@@ -349,8 +381,10 @@
log.debug("Message is pushed : {}", msg);
}
- proc.isMsgAdded = true;
-
+ if (proc.mutex.availablePermits() == 0) {
+ proc.mutex.release();
+ }
+
return true;
}
@@ -984,8 +1018,56 @@
return add(sw,fm);
}
+
+ @Override
+ public OFBarrierReply barrier(IOFSwitch sw) {
+ OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+ if (future == null) {
+ return null;
+ }
+
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.error("InterruptedException: {}", e);
+ return null;
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ log.error("ExecutionException: {}", e);
+ return null;
+ }
+ }
- private SwitchQueue getQueue(IOFSwitch sw) {
+ @Override
+ public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+ // TODO creation of message and future should be moved to OFSwitchImpl
+
+ if (sw == null) {
+ return null;
+ }
+
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+ msg.setXid(sw.getNextTransactionId());
+ add(sw, msg);
+
+ // TODO create Future object of message
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+
+ synchronized (barrierFutures) {
+ Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+ if (map == null) {
+ map = new HashMap<Integer,OFBarrierReplyFuture>();
+ barrierFutures.put(sw.getId(), map);
+ }
+ map.put(msg.getXid(), future);
+ log.debug("Inserted future for {}", msg.getXid());
+ }
+
+ return future;
+ }
+
+ protected SwitchQueue getQueue(IOFSwitch sw) {
if (sw == null) {
return null;
}
@@ -993,14 +1075,48 @@
return getProcess(sw).queues.get(sw);
}
- private long getHash(IOFSwitch sw) {
- // TODO should consider equalization algorithm
+ protected long getHash(IOFSwitch sw) {
+ // This code assumes DPID is sequentially assigned.
+ // TODO consider equalization algorithm
return sw.getId() % number_thread;
}
- private FlowPusherProcess getProcess(IOFSwitch sw) {
+ protected FlowPusherThread getProcess(IOFSwitch sw) {
long hash = getHash(sw);
return threadMap.get(hash);
}
+
+ @Override
+ public String getName() {
+ return "flowpusher";
+ }
+
+ @Override
+ public boolean isCallbackOrderingPrereq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public boolean isCallbackOrderingPostreq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+ Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+ if (map == null) {
+ return Command.CONTINUE;
+ }
+
+ OFBarrierReplyFuture future = map.get(msg.getXid());
+ if (future == null) {
+ return Command.CONTINUE;
+ }
+
+ log.debug("Received BARRIER_REPLY : {}", msg);
+ future.deliverFuture(sw, msg);
+
+ return Command.CONTINUE;
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index b2e4552..f3f5c50 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -2,7 +2,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -13,23 +12,8 @@
import org.openflow.protocol.OFFlowMod;
import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPacketOut;
import org.openflow.protocol.OFPort;
import org.openflow.protocol.OFStatisticsRequest;
-import org.openflow.protocol.action.OFAction;
-import org.openflow.protocol.action.OFActionDataLayerDestination;
-import org.openflow.protocol.action.OFActionDataLayerSource;
-import org.openflow.protocol.action.OFActionEnqueue;
-import org.openflow.protocol.action.OFActionNetworkLayerDestination;
-import org.openflow.protocol.action.OFActionNetworkLayerSource;
-import org.openflow.protocol.action.OFActionNetworkTypeOfService;
-import org.openflow.protocol.action.OFActionOutput;
-import org.openflow.protocol.action.OFActionStripVirtualLan;
-import org.openflow.protocol.action.OFActionTransportLayerDestination;
-import org.openflow.protocol.action.OFActionTransportLayerSource;
-import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
-import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
import org.openflow.protocol.statistics.OFFlowStatisticsReply;
import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
import org.openflow.protocol.statistics.OFStatistics;
@@ -37,36 +21,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.tinkerpop.blueprints.Direction;
-
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.IOFSwitchListener;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
-import net.floodlightcontroller.core.module.IFloodlightModule;
-import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.restserver.IRestApiService;
-import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.graph.GraphDBOperation;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
-import net.onrc.onos.ofcontroller.core.module.IOnosService;
-import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
import net.onrc.onos.ofcontroller.util.Dpid;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction;
-import net.onrc.onos.ofcontroller.util.FlowEntryActions;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionEnqueue;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionOutput;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetEthernetAddr;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIPv4Addr;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIpToS;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetTcpUdpPort;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanId;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanPriority;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionStripVlan;
import net.onrc.onos.registry.controller.IControllerRegistryService;
public class FlowSynchronizer implements IFlowSyncService, IOFSwitchListener {
@@ -86,8 +50,8 @@
public void synchronize(IOFSwitch sw) {
Synchroizer sync = new Synchroizer(sw);
Thread t = new Thread(sync);
- t.start();
switchThread.put(sw, t);
+ t.start();
}
@Override
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index e16dd20..94d6e35 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -1,43 +1,75 @@
package net.onrc.onos.ofcontroller.flowprogrammer;
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowPath;
-import org.openflow.protocol.OFMessage;
-
public interface IFlowPusherService extends IFloodlightService {
/**
- * Add a message to the queue of a switch.
- * @param sw
- * @param msg
- * @return
+ * Add a message to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param msg Message object to be added.
+ * @return true if message is successfully added to a queue.
*/
boolean add(IOFSwitch sw, OFMessage msg);
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param flowPath FlowPath object used for creating message.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry);
+
+ /**
+ * Create a message from IFlowEntry and add it to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param flowObj IFlowPath object used for creating message.
+ * @param flowEntryObj IFlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
+
+ /**
+ * Add BARRIER message to queue and wait for reply.
+ * @param sw Switch to which barrier message is pushed.
+ * @return BARRIER_REPLY message sent from switch.
+ */
+ OFBarrierReply barrier(IOFSwitch sw);
+
+ /**
+ * Add BARRIER message to queue asynchronously.
+ * @param sw Switch to which barrier message is pushed.
+ * @return Future object of BARRIER_REPLY message which will be sent from switch.
+ */
+ OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
/**
* Suspend pushing message to a switch.
- * @param sw
+ * @param sw Switch to be suspended pushing message.
* @return true if success
*/
boolean suspend(IOFSwitch sw);
/**
* Resume pushing message to a switch.
- * @param sw
+ * @param sw Switch to be resumed pushing message.
* @return true if success
*/
boolean resume(IOFSwitch sw);
/**
* Get whether pushing of message is suspended or not.
- * @param sw
- * @return true if suspended
+ * @param sw Switch to be checked.
+ * @return true if suspended.
*/
boolean isSuspended(IOFSwitch sw);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
new file mode 100644
index 0000000..3013f5a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
@@ -0,0 +1,49 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import java.util.concurrent.TimeUnit;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+public class OFBarrierReplyFuture extends OFMessageFuture<OFBarrierReply> {
+
+ protected volatile boolean finished;
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId);
+ init();
+ }
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId, timeout, unit);
+ init();
+ }
+
+ private void init() {
+ this.finished = false;
+ this.result = null;
+ }
+
+ @Override
+ protected void handleReply(IOFSwitch sw, OFMessage msg) {
+ this.result = (OFBarrierReply) msg;
+ this.finished = true;
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ protected void unRegister() {
+ super.unRegister();
+ }
+}