Fixed an issue that caused FlowPusher to go into an infinite loop.
FlowPusher used to use the IOFSwitch object as a key in its data structures.
When the IOFSwitchListener inteface changed to include only the DPID in its
callback methods instead of the IOFSwitch, the FlowPusher is now not able
to get a reference to the switch once it has disconnected. This means
FlowPusher can't clean out its data structures and it if messages are still
in the queue for the switch it will try and send them forever.
The fix is to change FlowPusher so its internal data structures use the DPID
as the key. While doing this I also changed most of its APIs so that they also
reference switches by DPID rather than by IOFSwitch.
Change-Id: I255d64fdac21d8b147f991ff41f6ecace310b116
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java
index 5711287..0b89107 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java
@@ -16,6 +16,7 @@
import net.floodlightcontroller.restserver.IRestApiService;
import net.onrc.onos.core.flowprogrammer.web.FlowProgrammerWebRoutable;
import net.onrc.onos.core.registry.IControllerRegistryService;
+import net.onrc.onos.core.util.Dpid;
import org.projectfloodlight.openflow.protocol.OFPortDesc;
import org.slf4j.Logger;
@@ -149,17 +150,13 @@
@Override
public void switchDisconnected(long swId) {
- IOFSwitch sw = floodlightProvider.getSwitch(swId);
- if (sw == null) {
- log.warn("Removed switch not available {} ", swId);
- return;
- }
- log.debug("Switch removed: {}", sw.getId());
+ Dpid dpid = new Dpid(swId);
+ log.debug("Switch removed: {}", dpid);
- if (ENABLE_FLOW_SYNC) {
- synchronizer.interrupt(sw);
- }
- pusher.deleteQueue(sw, true);
+ //if (ENABLE_FLOW_SYNC) {
+ //synchronizer.interrupt(sw);
+ //}
+ pusher.deleteQueue(dpid, true);
}
@Override
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
index c40eac0..aa1c057 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
@@ -25,6 +25,7 @@
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.onrc.onos.core.intent.FlowEntry;
+import net.onrc.onos.core.util.Dpid;
import org.apache.commons.lang3.tuple.Pair;
import org.projectfloodlight.openflow.protocol.OFBarrierReply;
@@ -36,8 +37,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.cache.CacheBuilder;
-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
@@ -51,11 +50,23 @@
*/
public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
- protected static final int DEFAULT_NUMBER_THREAD = 1;
+ private static final int DEFAULT_NUMBER_THREAD = 1;
// Number of messages sent to switch at once
protected static final int MAX_MESSAGE_SEND = 100;
+ private FloodlightModuleContext context = null;
+ private IThreadPoolService threadPool = null;
+ private IFloodlightProviderService floodlightProvider = null;
+
+ // Map of threads versus dpid
+ private Map<Long, FlowPusherThread> threadMap = null;
+ // Map from (DPID and transaction ID) to Future objects.
+ private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
+ new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
+
+ private int numberThread;
+
private static class SwitchQueueEntry {
OFMessage msg;
@@ -199,12 +210,12 @@
final long dpid;
final long xid;
- static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
- return new BarrierInfo(sw.getId(), req.getXid());
+ static BarrierInfo create(long dpid, OFBarrierRequest req) {
+ return new BarrierInfo(dpid, req.getXid());
}
- static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
- return new BarrierInfo(sw.getId(), rpy.getXid());
+ static BarrierInfo create(long dpid, OFBarrierReply rpy) {
+ return new BarrierInfo(dpid, rpy.getXid());
}
private BarrierInfo(long dpid, long xid) {
@@ -240,26 +251,11 @@
}
- private FloodlightModuleContext context = null;
- private IThreadPoolService threadPool = null;
- private IFloodlightProviderService floodlightProvider = null;
-
- // Map of threads versus dpid
- private Map<Long, FlowPusherThread> threadMap = null;
- // Map from (DPID and transaction ID) to Future objects.
- private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
- new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
-
- private int numberThread;
-
/**
* Main thread that reads messages from queues and sends them to switches.
*/
- private static class FlowPusherThread extends Thread {
- // Weak ConcurrentHashMap
- private Map<IOFSwitch, SwitchQueue> assignedQueues = CacheBuilder.newBuilder()
- .weakKeys()
- .<IOFSwitch, SwitchQueue>build().asMap();
+ private class FlowPusherThread extends Thread {
+ private Map<Dpid, SwitchQueue> assignedQueues = new ConcurrentHashMap<>();
final Lock queuingLock = new ReentrantLock();
final Condition messagePushed = queuingLock.newCondition();
@@ -283,10 +279,9 @@
}
}
- for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues
+ for (Iterator<Entry<Dpid, SwitchQueue>> it = assignedQueues
.entrySet().iterator(); it.hasNext();) {
- Entry<IOFSwitch, SwitchQueue> entry = it.next();
- IOFSwitch sw = entry.getKey();
+ Entry<Dpid, SwitchQueue> entry = it.next();
SwitchQueue queue = entry.getValue();
if (queue == null) {
@@ -294,7 +289,7 @@
}
synchronized (queue) {
- processQueue(sw, queue, MAX_MESSAGE_SEND);
+ processQueue(entry.getKey(), queue, MAX_MESSAGE_SEND);
if (queue.toBeDeleted && !queue.hasMessageToSend()) {
// remove queue if flagged to be.
it.remove();
@@ -308,16 +303,24 @@
* Read messages from queue and send them to the switch. If number of
* messages excess the limit, stop sending messages.
* <p>
- * @param sw Switch to which messages will be sent.
+ * @param dpid DPID of the switch to which messages will be sent.
* @param queue Queue of messages.
* @param maxMsg Limitation of number of messages to be sent. If set to
* 0, all messages in queue will be sent.
*/
- private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
+ private void processQueue(Dpid dpid, SwitchQueue queue, int maxMsg) {
// check sending rate and determine it to be sent or not
long currentTime = System.currentTimeMillis();
long size = 0;
+ IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
+ if (sw == null) {
+ // FlowPusher state for this switch will get cleaned up soon
+ // due to the switchDisconnected event
+ log.debug("Switch {} not found when processing queue", dpid);
+ return;
+ }
+
if (sw.isConnected() && queue.isSendable(currentTime)) {
int i = 0;
while (queue.hasMessageToSend()) {
@@ -419,13 +422,13 @@
}
@Override
- public boolean suspend(IOFSwitch sw) {
- SwitchQueue queue = getQueue(sw);
+ public boolean suspend(Dpid dpid) {
+ SwitchQueue queue = getQueue(dpid);
if (queue == null) {
// create queue in case suspend is called before first message
// addition
- queue = createQueueImpl(sw);
+ queue = createQueueImpl(dpid);
}
synchronized (queue) {
@@ -438,11 +441,11 @@
}
@Override
- public boolean resume(IOFSwitch sw) {
- SwitchQueue queue = getQueue(sw);
+ public boolean resume(Dpid dpid) {
+ SwitchQueue queue = getQueue(dpid);
if (queue == null) {
- log.error("No queue is attached to DPID: {}", sw.getStringId());
+ log.error("No queue is attached to DPID: {}", dpid);
return false;
}
@@ -451,7 +454,7 @@
queue.state = QueueState.READY;
// Free the latch if queue has any messages
- FlowPusherThread thread = getProcessingThread(sw);
+ FlowPusherThread thread = getProcessingThread(dpid);
if (queue.hasMessageToSend()) {
thread.notifyMessagePushed();
}
@@ -462,8 +465,8 @@
}
@Override
- public QueueState getState(IOFSwitch sw) {
- SwitchQueue queue = getQueue(sw);
+ public QueueState getState(Dpid dpid) {
+ SwitchQueue queue = getQueue(dpid);
if (queue == null) {
return QueueState.UNKNOWN;
@@ -486,14 +489,14 @@
}
@Override
- public void setRate(IOFSwitch sw, long rate) {
- SwitchQueue queue = getQueue(sw);
+ public void setRate(Dpid dpid, long rate) {
+ SwitchQueue queue = getQueue(dpid);
if (queue == null) {
- queue = createQueueImpl(sw);
+ queue = createQueueImpl(dpid);
}
if (rate > 0) {
- log.debug("rate for {} is set to {}", sw.getStringId(), rate);
+ log.debug("rate for {} is set to {}", dpid, rate);
synchronized (queue) {
queue.maxRate = rate;
}
@@ -503,43 +506,43 @@
@Override
@SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
justification = "Future versions of createQueueImpl() might return null")
- public boolean createQueue(IOFSwitch sw) {
- SwitchQueue queue = createQueueImpl(sw);
+ public boolean createQueue(Dpid dpid) {
+ SwitchQueue queue = createQueueImpl(dpid);
return (queue != null);
}
- protected SwitchQueue createQueueImpl(IOFSwitch sw) {
- SwitchQueue queue = getQueue(sw);
+ protected SwitchQueue createQueueImpl(Dpid dpid) {
+ SwitchQueue queue = getQueue(dpid);
if (queue != null) {
return queue;
}
- FlowPusherThread proc = getProcessingThread(sw);
+ FlowPusherThread proc = getProcessingThread(dpid);
queue = new SwitchQueue();
queue.state = QueueState.READY;
- proc.assignedQueues.put(sw, queue);
+ proc.assignedQueues.put(dpid, queue);
return queue;
}
@Override
- public boolean deleteQueue(IOFSwitch sw) {
- return deleteQueue(sw, false);
+ public boolean deleteQueue(Dpid dpid) {
+ return deleteQueue(dpid, false);
}
@Override
- public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
- FlowPusherThread proc = getProcessingThread(sw);
+ public boolean deleteQueue(Dpid dpid, boolean forceStop) {
+ FlowPusherThread proc = getProcessingThread(dpid);
if (forceStop) {
- SwitchQueue queue = proc.assignedQueues.remove(sw);
+ SwitchQueue queue = proc.assignedQueues.remove(dpid);
if (queue == null) {
return false;
}
return true;
} else {
- SwitchQueue queue = getQueue(sw);
+ SwitchQueue queue = getQueue(dpid);
if (queue == null) {
return false;
}
@@ -551,75 +554,81 @@
}
@Override
- public boolean add(IOFSwitch sw, OFMessage msg) {
- return add(sw, msg, MsgPriority.NORMAL);
+ public boolean add(Dpid dpid, OFMessage msg) {
+ return add(dpid, msg, MsgPriority.NORMAL);
}
@Override
- public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
- return addMessageImpl(sw, msg, priority);
+ public boolean add(Dpid dpid, OFMessage msg, MsgPriority priority) {
+ return addMessageImpl(dpid, msg, priority);
}
@Override
public void pushFlowEntries(
- Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+ Collection<Pair<Dpid, FlowEntry>> entries) {
pushFlowEntries(entries, MsgPriority.NORMAL);
}
@Override
public void pushFlowEntries(
- Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
+ Collection<Pair<Dpid, FlowEntry>> entries, MsgPriority priority) {
- for (Pair<IOFSwitch, FlowEntry> entry : entries) {
+ for (Pair<Dpid, FlowEntry> entry : entries) {
add(entry.getLeft(), entry.getRight(), priority);
}
}
@Override
- public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
- pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
+ public void pushFlowEntry(Dpid dpid, FlowEntry flowEntry) {
+ pushFlowEntry(dpid, flowEntry, MsgPriority.NORMAL);
}
@Override
- public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
- Collection<Pair<IOFSwitch, FlowEntry>> entries = new LinkedList<>();
+ public void pushFlowEntry(Dpid dpid, FlowEntry flowEntry, MsgPriority priority) {
+ Collection<Pair<Dpid, FlowEntry>> entries = new LinkedList<>();
- entries.add(Pair.of(sw, flowEntry));
+ entries.add(Pair.of(dpid, flowEntry));
pushFlowEntries(entries, priority);
}
/**
* Create a message from FlowEntry and add it to the queue of the switch.
* <p>
- * @param sw Switch to which message is pushed.
+ * @param dpid DPID of the switch to which the message is pushed.
* @param flowEntry FlowEntry object used for creating message.
* @return true if message is successfully added to a queue.
*/
- private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+ private boolean add(Dpid dpid, FlowEntry flowEntry, MsgPriority priority) {
+ IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
+ if (sw == null) {
+ log.warn("Couldn't find switch {} when pushing message", dpid);
+ return false;
+ }
+
//
// Create the OpenFlow Flow Modification Entry to push
//
OFFlowMod fm = flowEntry.buildFlowMod(sw.getFactory());
// log.trace("Pushing flow mod {}", fm);
- return addMessageImpl(sw, fm, priority);
+ return addMessageImpl(dpid, fm, priority);
}
/**
* Add message to queue.
* <p>
- * @param sw
- * @param msg
- * @param priority
+ * @param dpid DPID of the switch to which the message is sent
+ * @param msg message to send to the switch
+ * @param priority priority of the message
* @return true if the message was added successfully, otherwise false
*/
- protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
- FlowPusherThread thread = getProcessingThread(sw);
+ protected boolean addMessageImpl(Dpid dpid, OFMessage msg, MsgPriority priority) {
+ FlowPusherThread thread = getProcessingThread(dpid);
- SwitchQueue queue = getQueue(sw);
+ SwitchQueue queue = getQueue(dpid);
// create queue at first addition of message
if (queue == null) {
- queue = createQueueImpl(sw);
+ queue = createQueueImpl(dpid);
}
SwitchQueueEntry entry = new SwitchQueueEntry(msg);
@@ -627,7 +636,8 @@
synchronized (queue) {
queue.add(entry, priority);
if (log.isTraceEnabled()) {
- log.trace("Message is pushed to switch {}: {}", sw.getStringId(), entry.getOFMessage());
+ log.trace("Message is pushed to switch {}: {}",
+ dpid, entry.getOFMessage());
}
}
@@ -637,8 +647,8 @@
}
@Override
- public OFBarrierReply barrier(IOFSwitch sw) {
- OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+ public OFBarrierReply barrier(Dpid dpid) {
+ OFMessageFuture<OFBarrierReply> future = barrierAsync(dpid);
if (future == null) {
return null;
}
@@ -653,9 +663,9 @@
}
@Override
- public OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw) {
+ public OFMessageFuture<OFBarrierReply> barrierAsync(Dpid dpid) {
// TODO creation of message and future should be moved to OFSwitchImpl
-
+ IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
if (sw == null) {
return null;
}
@@ -663,8 +673,8 @@
OFBarrierRequest msg = createBarrierRequest(sw);
OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
(int) msg.getXid());
- barrierFutures.put(BarrierInfo.create(sw, msg), future);
- addMessageImpl(sw, msg, MsgPriority.NORMAL);
+ barrierFutures.put(BarrierInfo.create(dpid.value(), msg), future);
+ addMessageImpl(dpid, msg, MsgPriority.NORMAL);
return future;
}
@@ -683,42 +693,42 @@
/**
* Get a queue attached to a switch.
* <p>
- * @param sw Switch object
+ * @param dpid DPID of the switch
* @return Queue object
*/
- protected SwitchQueue getQueue(IOFSwitch sw) {
- if (sw == null) {
+ protected SwitchQueue getQueue(Dpid dpid) {
+ if (dpid == null) {
return null;
}
- FlowPusherThread th = getProcessingThread(sw);
+ FlowPusherThread th = getProcessingThread(dpid);
if (th == null) {
return null;
}
- return th.assignedQueues.get(sw);
+ return th.assignedQueues.get(dpid);
}
/**
* Get a hash value correspondent to a switch.
* <p>
- * @param sw Switch object
+ * @param dpid DPID of the switch
* @return Hash value
*/
- protected long getHash(IOFSwitch sw) {
+ protected long getHash(long dpid) {
// This code assumes DPID is sequentially assigned.
// TODO consider equalization algorithm
- return sw.getId() % numberThread;
+ return dpid % numberThread;
}
/**
* Get a Thread object which processes the queue attached to a switch.
* <p>
- * @param sw Switch object
+ * @param dpid DPID of the switch
* @return Thread object
*/
- protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
- long hash = getHash(sw);
+ protected FlowPusherThread getProcessingThread(Dpid dpid) {
+ long hash = getHash(dpid.value());
return threadMap.get(hash);
}
@@ -751,7 +761,7 @@
}
OFBarrierReply reply = (OFBarrierReply) msg;
- BarrierInfo info = BarrierInfo.create(sw, reply);
+ BarrierInfo info = BarrierInfo.create(sw.getId(), reply);
// Deliver future if exists
OFBarrierReplyFuture future = barrierFutures.get(info);
if (future != null) {
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java
index 0441ddc..acfb664 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java
@@ -9,6 +9,7 @@
import java.util.concurrent.FutureTask;
import net.floodlightcontroller.core.IOFSwitch;
+import net.onrc.onos.core.util.Dpid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +80,7 @@
@Override
public SyncResult call() {
- pusher.suspend(sw);
+ pusher.suspend(new Dpid(sw.getId()));
try {
long start = System.nanoTime();
Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
@@ -96,7 +97,7 @@
return result;
} finally {
- pusher.resume(sw);
+ pusher.resume(new Dpid(sw.getId()));
}
}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
index 25162aa..1026855 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
@@ -2,10 +2,10 @@
import java.util.Collection;
-import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.core.intent.FlowEntry;
+import net.onrc.onos.core.util.Dpid;
import org.apache.commons.lang3.tuple.Pair;
import org.projectfloodlight.openflow.protocol.OFBarrierReply;
@@ -20,9 +20,9 @@
*/
public interface IFlowPusherService extends IFloodlightService {
public static enum MsgPriority {
- HIGH, // High priority: e.g. flow synchronization
+ HIGH, // High priority: e.g. flow synchronization
NORMAL, // Normal priority
-// LOW, // Low priority, not needed for now
+ // LOW, // Low priority, not needed for now
}
public static enum QueueState {
@@ -34,53 +34,53 @@
/**
* Create a queue correspondent to the switch.
*
- * @param sw Switch to which new queue is attached.
+ * @param dpid DPID of the switch to which new queue is attached.
* @return true if new queue is successfully created.
*/
- boolean createQueue(IOFSwitch sw);
+ boolean createQueue(Dpid dpid);
/**
* Delete a queue correspondent to the switch.
* Messages remains in queue will be all sent before queue is deleted.
*
- * @param sw Switch of which queue is deleted.
+ * @param dpid DPID of the switch from which the queue is deleted.
* @return true if queue is successfully deleted.
*/
- boolean deleteQueue(IOFSwitch sw);
+ boolean deleteQueue(Dpid dpid);
/**
* Delete a queue correspondent to the switch.
* By setting force flag on, queue will be deleted immediately.
*
- * @param sw Switch of which queue is deleted.
+ * @param dpid DPID of the switch from which the queue is deleted.
* @param forceStop If this flag is set to true, queue will be deleted
* immediately regardless of any messages in the queue.
* If false, all messages will be sent to switch and queue will
* be deleted after that.
* @return true if queue is successfully deleted or flagged to be deleted.
*/
- boolean deleteQueue(IOFSwitch sw, boolean forceStop);
+ boolean deleteQueue(Dpid dpid, boolean forceStop);
/**
* Add a message to the queue of the switch with normal priority.
* <p/>
* Note: Notification is NOT delivered for the pushed message.
*
- * @param sw Switch to which message is pushed.
+ * @param dpid DPID of the switch to which the message is pushed.
* @param msg Message object to be added.
* @return true if message is successfully added to a queue.
*/
- boolean add(IOFSwitch sw, OFMessage msg);
+ boolean add(Dpid dpid, OFMessage msg);
/**
* Add a message to the queue of the switch with specific priority.
*
- * @param sw Switch to which message is pushed.
+ * @param dpid DPID of the switch to which the message is pushed.
* @param msg Message object to be added.
* @param priority Sending priority of the message.
* @return true if message is successfully added to a queue.
*/
- boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority);
+ boolean add(Dpid dpid, OFMessage msg, MsgPriority priority);
/**
* Push a collection of Flow Entries to the corresponding switches
@@ -89,10 +89,10 @@
* Note: Notification is delivered for the Flow Entries that
* are pushed successfully.
*
- * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+ * @param entries the collection of <Dpid, FlowEntry> pairs
* to push.
*/
- void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries);
+ void pushFlowEntries(Collection<Pair<Dpid, FlowEntry>> entries);
/**
* Push a collection of Flow Entries to the corresponding switches
@@ -101,11 +101,11 @@
* Note: Notification is delivered for the Flow Entries that
* are pushed successfully.
*
- * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+ * @param entries the collection of <Dpid, FlowEntry> pairs
* to push.
* @param priority Sending priority of flow entries.
*/
- void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries,
+ void pushFlowEntries(Collection<Pair<Dpid, FlowEntry>> entries,
MsgPriority priority);
/**
@@ -115,10 +115,10 @@
* Note: Notification is delivered for the Flow Entries that
* are pushed successfully.
*
- * @param sw Switch to which message is pushed.
+ * @param dpid DPID of the switch to which the message is pushed.
* @param flowEntry FlowEntry object used for creating message.
*/
- void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry);
+ void pushFlowEntry(Dpid dpid, FlowEntry flowEntry);
/**
* Create a message from FlowEntry and add it to the queue of the
@@ -127,57 +127,61 @@
* Note: Notification is delivered for the Flow Entries that
* are pushed successfully.
*
- * @param sw Switch to which message is pushed.
+ * @param dpid DPID of the switch to which the message is pushed.
* @param flowEntry FlowEntry object used for creating message.
+ * @param priority Sending priority of flow entries.
*/
- void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry,
+ void pushFlowEntry(Dpid dpid, FlowEntry flowEntry,
MsgPriority priority);
/**
* Set sending rate to a switch.
+ * <p/>
+ * TODO The rate limiter function currently does not work as we are unable
+ * to determine the size of the messages when using Loxi.
*
- * @param sw Switch.
+ * @param dpid DPID of the switch to alter the sending rate of.
* @param rate Rate in bytes/ms.
*/
- public void setRate(IOFSwitch sw, long rate);
+ public void setRate(Dpid dpid, long rate);
/**
* Add BARRIER message to queue and wait for reply.
*
- * @param sw Switch to which barrier message is pushed.
+ * @param dpid DPID of the switch to which a barrier message is pushed.
* @return BARRIER_REPLY message sent from switch.
*/
- OFBarrierReply barrier(IOFSwitch sw);
+ OFBarrierReply barrier(Dpid dpid);
/**
* Add BARRIER message to queue asynchronously.
*
- * @param sw Switch to which barrier message is pushed.
+ * @param dpid DPID of the switch to which a barrier message is pushed.
* @return Future object of BARRIER_REPLY message which will be sent from switch.
*/
- OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
+ OFMessageFuture<OFBarrierReply> barrierAsync(Dpid dpid);
/**
* Suspend pushing message to a switch.
*
- * @param sw Switch to be suspended pushing message.
+ * @param dpid DPID of the switch whose queue is to be suspended.
* @return true if success
*/
- boolean suspend(IOFSwitch sw);
+ boolean suspend(Dpid dpid);
/**
* Resume pushing message to a switch.
*
- * @param sw Switch to be resumed pushing message.
+ * @param dpid DPID of the switch whose queue is to be resumed.
* @return true if success
*/
- boolean resume(IOFSwitch sw);
+ boolean resume(Dpid dpid);
/**
* Get state of queue attached to a switch.
*
- * @param sw Switch to be checked.
+ * @param dpid DPID of the switch to be checked.
* @return State of queue.
*/
- QueueState getState(IOFSwitch sw);
+ QueueState getState(Dpid dpid);
}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/ResumePusherResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/ResumePusherResource.java
index ecfec7e..c59fc0d 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/web/ResumePusherResource.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/ResumePusherResource.java
@@ -1,6 +1,6 @@
package net.onrc.onos.core.flowprogrammer.web;
-import net.floodlightcontroller.core.IOFSwitch;
+import net.onrc.onos.core.util.Dpid;
import org.projectfloodlight.openflow.util.HexString;
import org.restlet.resource.Get;
@@ -30,12 +30,6 @@
return false;
}
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return false;
- }
-
- return pusher.resume(sw);
+ return pusher.resume(new Dpid(dpid));
}
}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java
index bdcdcb6..f97ea9c 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java
@@ -1,6 +1,6 @@
package net.onrc.onos.core.flowprogrammer.web;
-import net.floodlightcontroller.core.IOFSwitch;
+import net.onrc.onos.core.util.Dpid;
import org.projectfloodlight.openflow.protocol.OFBarrierReply;
import org.projectfloodlight.openflow.util.HexString;
@@ -30,12 +30,6 @@
return null;
}
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return null;
- }
-
- return pusher.barrier(sw);
+ return pusher.barrier(new Dpid(dpid));
}
}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SetPushRateResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SetPushRateResource.java
index 63717a1..6ec156c 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SetPushRateResource.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SetPushRateResource.java
@@ -1,6 +1,6 @@
package net.onrc.onos.core.flowprogrammer.web;
-import net.floodlightcontroller.core.IOFSwitch;
+import net.onrc.onos.core.util.Dpid;
import org.projectfloodlight.openflow.util.HexString;
import org.restlet.resource.Get;
@@ -34,13 +34,7 @@
return false;
}
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return false;
- }
-
- pusher.setRate(sw, rate);
+ pusher.setRate(new Dpid(dpid), rate);
return true;
}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SuspendPusherResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SuspendPusherResource.java
index e1aa320..0d53c9a 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SuspendPusherResource.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SuspendPusherResource.java
@@ -1,6 +1,6 @@
package net.onrc.onos.core.flowprogrammer.web;
-import net.floodlightcontroller.core.IOFSwitch;
+import net.onrc.onos.core.util.Dpid;
import org.projectfloodlight.openflow.util.HexString;
import org.restlet.resource.Get;
@@ -35,12 +35,6 @@
return false;
}
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return false;
- }
-
- return pusher.suspend(sw);
+ return pusher.suspend(new Dpid(dpid));
}
}
diff --git a/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallRuntime.java b/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallRuntime.java
index d9a235b..60e2737 100644
--- a/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallRuntime.java
+++ b/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallRuntime.java
@@ -14,6 +14,7 @@
import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
import net.onrc.onos.core.intent.FlowEntry;
+import net.onrc.onos.core.util.Dpid;
import org.apache.commons.lang3.tuple.Pair;
import org.projectfloodlight.openflow.protocol.OFBarrierReply;
@@ -170,7 +171,7 @@
log.debug("IOFSwitches: {}", switches);
FlowModCount.startCount();
for (Set<FlowEntry> phase : plan) {
- Set<Pair<IOFSwitch, FlowEntry>> entries = new HashSet<>();
+ Set<Pair<Dpid, FlowEntry>> entries = new HashSet<>();
Set<IOFSwitch> modifiedSwitches = new HashSet<>();
long step1 = System.nanoTime();
@@ -182,7 +183,7 @@
log.debug("Skipping flow entry: {}", entry);
continue;
}
- entries.add(Pair.of(sw, entry));
+ entries.add(Pair.of(new Dpid(entry.getSwitch()), entry));
modifiedSwitches.add(sw);
FlowModCount.countFlowEntry(sw, entry);
}
@@ -197,7 +198,7 @@
// wait for confirmation messages before proceeding
List<Pair<IOFSwitch, OFMessageFuture<OFBarrierReply>>> barriers = new ArrayList<>();
for (IOFSwitch sw : modifiedSwitches) {
- barriers.add(Pair.of(sw, pusher.barrierAsync(sw)));
+ barriers.add(Pair.of(sw, pusher.barrierAsync(new Dpid(sw.getId()))));
}
for (Pair<IOFSwitch, OFMessageFuture<OFBarrierReply>> pair : barriers) {
IOFSwitch sw = pair.getLeft();
diff --git a/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java b/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java
index ce79994..f872e5c 100644
--- a/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java
+++ b/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java
@@ -254,7 +254,7 @@
.setActions(actions)
.build();
- flowPusher.add(sw, po);
+ flowPusher.add(new Dpid(sw.getId()), po);
}
}
diff --git a/src/test/java/net/onrc/onos/core/flowprogrammer/FlowPusherTest.java b/src/test/java/net/onrc/onos/core/flowprogrammer/FlowPusherTest.java
index 6ad1e5f..b770705 100644
--- a/src/test/java/net/onrc/onos/core/flowprogrammer/FlowPusherTest.java
+++ b/src/test/java/net/onrc/onos/core/flowprogrammer/FlowPusherTest.java
@@ -31,8 +31,10 @@
import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.onrc.onos.core.intent.FlowEntry;
import net.onrc.onos.core.intent.IntentOperation.Operator;
+import net.onrc.onos.core.util.Dpid;
import net.onrc.onos.core.util.IntegrationTest;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.projectfloodlight.openflow.protocol.OFBarrierReply;
@@ -68,11 +70,13 @@
public void testAddMessage() {
beginInitMock();
+ Dpid dpid = new Dpid(1L);
+
OFMessage msg = createMock(OFMessage.class);
expect(msg.getXid()).andReturn((long) 1).anyTimes();
replay(msg);
- IOFSwitch sw = createConnectedSwitchMock(1);
+ IOFSwitch sw = createConnectedSwitchMock(dpid.value());
try {
sw.write(eq(msg), eq((FloodlightContext) null));
@@ -85,7 +89,7 @@
endInitMock();
initPusher(1);
- boolean addResult = pusher.add(sw, msg);
+ boolean addResult = pusher.add(dpid, msg);
assertTrue(addResult);
try {
@@ -111,8 +115,9 @@
beginInitMock();
- IOFSwitch sw = createConnectedSwitchMock(1);
+ Dpid dpid = new Dpid(1L);
+ IOFSwitch sw = createConnectedSwitchMock(dpid.value());
List<OFMessage> messages = new ArrayList<OFMessage>();
@@ -134,7 +139,7 @@
initPusher(1);
for (OFMessage msg : messages) {
- boolean addResult = pusher.add(sw, msg);
+ boolean addResult = pusher.add(dpid, msg);
assertTrue(addResult);
}
@@ -164,9 +169,11 @@
beginInitMock();
- Map<IOFSwitch, List<OFMessage>> swMap = new HashMap<IOFSwitch, List<OFMessage>>();
+ Map<IOFSwitch, List<OFMessage>> swMap = new HashMap<>();
for (int i = 0; i < numSwitch; ++i) {
- IOFSwitch sw = createConnectedSwitchMock(i);
+ Dpid dpid = new Dpid(i);
+
+ IOFSwitch sw = createConnectedSwitchMock(dpid.value());
List<OFMessage> messages = new ArrayList<OFMessage>();
@@ -192,7 +199,7 @@
for (IOFSwitch sw : swMap.keySet()) {
for (OFMessage msg : swMap.get(sw)) {
- boolean addResult = pusher.add(sw, msg);
+ boolean addResult = pusher.add(new Dpid(sw.getId()), msg);
assertTrue(addResult);
}
}
@@ -226,7 +233,7 @@
beginInitMock();
- Map<IOFSwitch, List<OFMessage>> swMap = new HashMap<IOFSwitch, List<OFMessage>>();
+ Map<IOFSwitch, List<OFMessage>> swMap = new HashMap<>();
for (int i = 0; i < numThreads; ++i) {
IOFSwitch sw = createConnectedSwitchMock(i);
//EasyMock.replay(sw);
@@ -255,7 +262,7 @@
initPusher(numThreads);
for (IOFSwitch sw : swMap.keySet()) {
for (OFMessage msg : swMap.get(sw)) {
- boolean addResult = pusher.add(sw, msg);
+ boolean addResult = pusher.add(new Dpid(sw.getId()), msg);
assertTrue(addResult);
}
}
@@ -284,6 +291,20 @@
/**
* Test rate limitation of messages works correctly.
*/
+ // XXX Disabling this test for now.
+ // This test doesn't seem to be correctly testing the rate limiting feature.
+ // It tries to measure the time taken to push a set of messages, however the
+ // 'end' timestamp is actually taken when the expectation is set up, before
+ // any messages have been pushed. This means generally when measuredRate is
+ // calculated it is a negative value, so is always less than the allowedRate
+ // and the test passes.
+ // However I was seeing some random failures caused by a divide-by-zero
+ // error, which occurs when the 'end' and 'start' timestamps are taken in
+ // the same millisecond.
+ // Furthermore, rate limits are set in bytes/ms and the rate limiter relies
+ // on being able to get the size of the packets, which is not possible
+ // with Loxi messages. Therefore the rate limiter currently does not work.
+ @Ignore
@Test
public void testRateLimitedAddMessage() {
final long limitRate = 100; // [bytes/ms]
@@ -297,7 +318,9 @@
beginInitMock();
- IOFSwitch sw = createConnectedSwitchMock(1);
+ Dpid dpid = new Dpid(1L);
+
+ IOFSwitch sw = createConnectedSwitchMock(dpid.value());
List<OFMessage> messages = new ArrayList<OFMessage>();
@@ -328,16 +351,16 @@
endInitMock();
initPusher(1);
- pusher.createQueue(sw);
- pusher.setRate(sw, limitRate);
+ pusher.createQueue(dpid);
+ pusher.setRate(dpid, limitRate);
long beginTime = System.currentTimeMillis();
for (OFMessage msg : messages) {
- boolean addResult = pusher.add(sw, msg);
+ boolean addResult = pusher.add(dpid, msg);
assertTrue(addResult);
}
- pusher.barrierAsync(sw);
+ pusher.barrierAsync(dpid);
try {
do {
@@ -366,7 +389,9 @@
public void testBarrierMessage() {
beginInitMock();
- IOFSwitch sw = createConnectedSwitchMock(1);
+ Dpid dpid = new Dpid(1L);
+
+ IOFSwitch sw = createConnectedSwitchMock(dpid.value());
expect(sw.getOFVersion()).andReturn(OFVersion.OF_10).once();
try {
@@ -379,7 +404,7 @@
endInitMock();
initPusher(1);
- OFMessageFuture<OFBarrierReply> future = pusher.barrierAsync(sw);
+ OFMessageFuture<OFBarrierReply> future = pusher.barrierAsync(dpid);
assertNotNull(future);
@@ -403,19 +428,12 @@
@SuppressWarnings("unchecked")
@Test
public void testAddFlow() {
+ Dpid dpid = new Dpid(DPID_TO_VERIFY);
+
// instantiate required objects
- FlowEntry flowEntry1 = new FlowEntry(DPID_TO_VERIFY, 1, 11, null, null, 0, 0, Operator.ADD);
- /*
- flowEntry1.setDpid(new Dpid(DPID_TO_VERIFY));
- flowEntry1.setFlowId(new FlowId(1));
- flowEntry1.setInPort(PortNumber.uint16((short) 1));
- flowEntry1.setOutPort(PortNumber.uint16((short) 11));
- flowEntry1.setFlowEntryId(new FlowEntryId(1));
- flowEntry1.setFlowEntryMatch(new FlowEntryMatch());
- flowEntry1.setFlowEntryActions(new FlowEntryActions());
- flowEntry1.setFlowEntryErrorState(new FlowEntryErrorState());
- flowEntry1.setFlowEntryUserState(FlowEntryUserState.FE_USER_ADD);
- */
+ FlowEntry flowEntry1 =
+ new FlowEntry(DPID_TO_VERIFY, 1, 11, null,
+ null, 0, 0, Operator.ADD);
beginInitMock();
@@ -451,7 +469,7 @@
endInitMock();
initPusher(1);
- pusher.pushFlowEntry(sw, flowEntry1);
+ pusher.pushFlowEntry(dpid, flowEntry1);
try {
Thread.sleep(1000);
@@ -523,6 +541,9 @@
sw.flush();
expectLastCall().anyTimes();
+ expect(flProviderService.getMasterSwitch(dpid)).andReturn(sw)
+ .anyTimes();
+
return sw;
}
diff --git a/src/test/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizerTest.java b/src/test/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizerTest.java
index 654cb7a..b257dbd 100644
--- a/src/test/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizerTest.java
+++ b/src/test/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizerTest.java
@@ -22,6 +22,7 @@
import net.onrc.onos.core.flowprogrammer.IFlowPusherService.MsgPriority;
import net.onrc.onos.core.flowprogrammer.IFlowSyncService.SyncResult;
import net.onrc.onos.core.intent.FlowEntry;
+import net.onrc.onos.core.util.Dpid;
import org.easymock.IAnswer;
import org.junit.After;
@@ -67,9 +68,9 @@
idRemoved = new ArrayList<Long>();
pusher = createMock(FlowPusher.class);
- expect(pusher.suspend(anyObject(IOFSwitch.class))).andReturn(true).anyTimes();
- expect(pusher.resume(anyObject(IOFSwitch.class))).andReturn(true).anyTimes();
- pusher.add(anyObject(IOFSwitch.class), anyObject(OFMessage.class),
+ expect(pusher.suspend(anyObject(Dpid.class))).andReturn(true).anyTimes();
+ expect(pusher.resume(anyObject(Dpid.class))).andReturn(true).anyTimes();
+ pusher.add(anyObject(Dpid.class), anyObject(OFMessage.class),
eq(MsgPriority.HIGH));
expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
@@ -84,7 +85,7 @@
return null;
}
}).anyTimes();
- pusher.pushFlowEntry(anyObject(IOFSwitch.class), anyObject(FlowEntry.class),
+ pusher.pushFlowEntry(anyObject(Dpid.class), anyObject(FlowEntry.class),
eq(MsgPriority.HIGH));
expectLastCall().andAnswer(new IAnswer<Object>() {
@Override