Merge branch 'syncdev' into syncdev17
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index a59a9f9..ebc86e4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -4,14 +4,16 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
public class FlowProgrammer implements IFloodlightModule {
- private static final boolean enableFlowSync = false;
+ private static final boolean enableFlowSync = true;
protected volatile IFloodlightProviderService floodlightProvider;
@@ -31,7 +33,7 @@
public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
- pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
+ pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
if (enableFlowSync) {
synchronizer.init(context);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 532477a..6379f26 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -8,6 +8,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import org.openflow.protocol.*;
import org.openflow.protocol.action.*;
@@ -16,7 +17,12 @@
import org.slf4j.LoggerFactory;
import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
@@ -38,7 +44,7 @@
* @author Naoki Shiota
*
*/
-public class FlowPusher implements IFlowPusherService {
+public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
// NOTE: Below are moved from FlowManager.
@@ -64,6 +70,12 @@
SUSPENDED,
}
+ /**
+ * Message queue attached to a switch.
+ * This consists of queue itself and variables used for limiting sending rate.
+ * @author Naoki Shiota
+ *
+ */
@SuppressWarnings("serial")
private class SwitchQueue extends ArrayDeque<OFMessage> {
QueueState state;
@@ -84,13 +96,9 @@
return true;
}
+ // Check if sufficient time (from aspect of rate) elapsed or not.
long rate = last_sent_size / (current - last_sent_time);
-
- if (rate < max_rate) {
- return true;
- } else {
- return false;
- }
+ return (rate < max_rate);
}
void logSentData(long current, long size) {
@@ -100,11 +108,14 @@
}
- private OFMessageDamper messageDamper;
+ private OFMessageDamper messageDamper = null;
+ private IThreadPoolService threadPool = null;
private FloodlightContext context = null;
private BasicFactory factory = null;
private Map<Long, FlowPusherProcess> threadMap = null;
+ private Map<Long, Map<Integer, OFBarrierReplyFuture>>
+ barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
private int number_thread = 1;
@@ -197,17 +208,39 @@
}
}
+ /**
+ * Initialize object with one thread.
+ */
public FlowPusher() {
-
}
+ /**
+ * Initialize object with threads of given number.
+ * @param number_thread Number of threads to handle messages.
+ */
public FlowPusher(int number_thread) {
this.number_thread = number_thread;
}
- public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
+ /**
+ * Set parameters needed for sending messages.
+ * @param context FloodlightContext used for sending messages.
+ * If null, FlowPusher uses default context.
+ * @param modContext FloodlightModuleContext used for acquiring
+ * ThreadPoolService and registering MessageListener.
+ * @param factory Factory object to create OFMessage objects.
+ * @param damper Message damper used for sending messages.
+ * If null, FlowPusher creates its own damper object.
+ */
+ public void init(FloodlightContext context,
+ FloodlightModuleContext modContext,
+ BasicFactory factory,
+ OFMessageDamper damper) {
this.context = context;
this.factory = factory;
+ this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+ IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
+ flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
if (damper != null) {
messageDamper = damper;
@@ -326,7 +359,7 @@
}
/**
- * Add OFMessage to the queue related to given switch.
+ * Add OFMessage to queue of the switch.
* @param sw Switch to which message is sent.
* @param msg Message to be sent.
* @return true if succeed.
@@ -984,8 +1017,52 @@
return add(sw,fm);
}
+
+ @Override
+ public OFBarrierReply barrier(IOFSwitch sw) {
+ OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+ if (future == null) {
+ return null;
+ }
+
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.error("InterruptedException: {}", e);
+ return null;
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ log.error("ExecutionException: {}", e);
+ return null;
+ }
+ }
- private SwitchQueue getQueue(IOFSwitch sw) {
+ @Override
+ public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+ // TODO creation of message and future should be moved to OFSwitchImpl
+
+ OFMessage msg = factory.getMessage(OFType.BARRIER_REQUEST);
+ msg.setXid(sw.getNextTransactionId());
+ add(sw, msg);
+
+ // TODO create Future object of message
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+
+ synchronized (barrierFutures) {
+ Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+ if (map == null) {
+ map = new HashMap<Integer,OFBarrierReplyFuture>();
+ barrierFutures.put(sw.getId(), map);
+ }
+ map.put(msg.getXid(), future);
+ log.debug("Created future for {}", msg.getXid());
+ }
+
+ return future;
+ }
+
+ protected SwitchQueue getQueue(IOFSwitch sw) {
if (sw == null) {
return null;
}
@@ -993,14 +1070,53 @@
return getProcess(sw).queues.get(sw);
}
- private long getHash(IOFSwitch sw) {
- // TODO should consider equalization algorithm
+ protected long getHash(IOFSwitch sw) {
+ // This code assumes DPID is sequentially assigned.
+ // TODO consider equalization algorithm
return sw.getId() % number_thread;
}
- private FlowPusherProcess getProcess(IOFSwitch sw) {
+ protected FlowPusherProcess getProcess(IOFSwitch sw) {
long hash = getHash(sw);
return threadMap.get(hash);
}
+
+ @Override
+ public String getName() {
+ return "flowpusher";
+ }
+
+ @Override
+ public boolean isCallbackOrderingPrereq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public boolean isCallbackOrderingPostreq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+ // This check can be skipped (since Controller must filter).
+// if (msg.getType() != OFType.BARRIER_REPLY) {
+// return Command.CONTINUE;
+// }
+
+ Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+ if (map == null) {
+ return Command.CONTINUE;
+ }
+
+ OFBarrierReplyFuture future = map.get(msg.getXid());
+ if (future == null) {
+ return Command.CONTINUE;
+ }
+
+ log.debug("Received BARRIER_REPLY : {}", msg);
+ future.deliverFuture(sw, msg);
+
+ return Command.CONTINUE;
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index e16dd20..94d6e35 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -1,43 +1,75 @@
package net.onrc.onos.ofcontroller.flowprogrammer;
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowPath;
-import org.openflow.protocol.OFMessage;
-
public interface IFlowPusherService extends IFloodlightService {
/**
- * Add a message to the queue of a switch.
- * @param sw
- * @param msg
- * @return
+ * Add a message to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param msg Message object to be added.
+ * @return true if message is successfully added to a queue.
*/
boolean add(IOFSwitch sw, OFMessage msg);
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param flowPath FlowPath object used for creating message.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry);
+
+ /**
+ * Create a message from IFlowEntry and add it to the queue of the switch.
+ * @param sw Switch to which message is pushed.
+ * @param flowObj IFlowPath object used for creating message.
+ * @param flowEntryObj IFlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
+
+ /**
+ * Add BARRIER message to queue and wait for reply.
+ * @param sw Switch to which barrier message is pushed.
+ * @return BARRIER_REPLY message sent from switch.
+ */
+ OFBarrierReply barrier(IOFSwitch sw);
+
+ /**
+ * Add BARRIER message to queue asynchronously.
+ * @param sw Switch to which barrier message is pushed.
+ * @return Future object of BARRIER_REPLY message which will be sent from switch.
+ */
+ OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
/**
* Suspend pushing message to a switch.
- * @param sw
+ * @param sw Switch to be suspended pushing message.
* @return true if success
*/
boolean suspend(IOFSwitch sw);
/**
* Resume pushing message to a switch.
- * @param sw
+ * @param sw Switch to be resumed pushing message.
* @return true if success
*/
boolean resume(IOFSwitch sw);
/**
* Get whether pushing of message is suspended or not.
- * @param sw
- * @return true if suspended
+ * @param sw Switch to be checked.
+ * @return true if suspended.
*/
boolean isSuspended(IOFSwitch sw);
}