Cherry-pick from https://gerrit.onos.onlab.us/#/c/92/4 (partially).
NOTE: Abandoned the implementation about automatic-barrier function.
FlowEntryAdded event is changed to be occurred after reception of BARRIER_REPLY (ONOS-915).
Fixed a bug that the same FlowEntry is installed to switch multiple times.
Change-Id: Ibb5cdaf9a478e2697232203d9190dedb4792f108
Fixed a bug that FlowEntries are shallow-copied in FlowPusher.
Change-Id: I704d245d3660a91f5e60fd6cdf05f89b1fba6c25
Fixed a problem that FlowPusherTest fails often(ONOS-984).
Change-Id: I2a290d5dc756e43a8058c6f35372f2e2932a8d73
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 21be62c..d2af973 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
@@ -29,7 +30,6 @@
import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
import net.onrc.onos.ofcontroller.util.FlowEntry;
@@ -54,7 +54,6 @@
*/
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
- protected volatile IFlowService flowManager;
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
@@ -70,6 +69,18 @@
READY,
SUSPENDED,
}
+
+ private static class SwitchQueueEntry {
+ OFMessage msg;
+
+ public SwitchQueueEntry(OFMessage msg) {
+ this.msg = msg;
+ }
+
+ public OFMessage getOFMessage() {
+ return msg;
+ }
+ }
/**
* SwitchQueue represents message queue attached to a switch.
@@ -77,7 +88,7 @@
* @author Naoki Shiota
*
*/
- private class SwitchQueue extends ArrayDeque<OFMessage> {
+ private class SwitchQueue extends ArrayDeque<SwitchQueueEntry> {
private static final long serialVersionUID = 1L;
QueueState state;
@@ -119,6 +130,52 @@
last_sent_time = current;
last_sent_size = size;
}
+ }
+
+ /**
+ * BarrierInfo holds information to specify barrier message sent to switch.
+ * @author Naoki
+ */
+ private static class BarrierInfo {
+ final long dpid;
+ final int xid;
+
+ static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
+ return new BarrierInfo(sw.getId(), req.getXid());
+ }
+
+ static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
+ return new BarrierInfo(sw.getId(), rpy.getXid());
+ }
+
+ private BarrierInfo(long dpid, int xid) {
+ this.dpid = dpid;
+ this.xid = xid;
+ }
+
+ // Auto generated code by Eclipse
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (dpid ^ (dpid >>> 32));
+ result = prime * result + xid;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ BarrierInfo other = (BarrierInfo) obj;
+ return (this.dpid == other.dpid) && (this.xid == other.xid);
+ }
+
}
@@ -130,9 +187,9 @@
// Map of threads versus dpid
private Map<Long, FlowPusherThread> threadMap = null;
- // Map of Future objects versus dpid and transaction ID.
- private Map<Long, Map<Integer, OFBarrierReplyFuture>>
- barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
+ // Map from (DPID and transaction ID) to Future objects.
+ private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
+ = new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
private int number_thread = 1;
@@ -143,7 +200,7 @@
*/
private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
- = new HashMap<IOFSwitch,SwitchQueue>();
+ = new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
// reusable latch used for waiting for arrival of message
private Semaphore mutex = new Semaphore(0);
@@ -163,10 +220,8 @@
// for safety of concurrent access, copy all key objects
Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
- synchronized (queues) {
- for (IOFSwitch sw : queues.keySet()) {
- keys.add(sw);
- }
+ for (IOFSwitch sw : queues.keySet()) {
+ keys.add(sw);
}
for (IOFSwitch sw : keys) {
@@ -183,9 +238,7 @@
if (queue.isEmpty()) {
// remove queue if flagged to be.
if (queue.toBeDeleted) {
- synchronized (queues) {
- queues.remove(sw);
- }
+ queues.remove(sw);
}
} else {
// if some messages remains in queue, latch down
@@ -206,7 +259,7 @@
* @param max_msg Limitation of number of messages to be sent. If set to 0,
* all messages in queue will be sent.
*/
- private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
+ private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
// check sending rate and determine it to be sent or not
long current_time = System.currentTimeMillis();
long size = 0;
@@ -220,16 +273,24 @@
}
++i;
- OFMessage msg = queue.poll();
+ SwitchQueueEntry queueEntry;
+ synchronized (queue) {
+ queueEntry = queue.poll();
+ }
+
+ OFMessage msg = queueEntry.getOFMessage();
try {
messageDamper.write(sw, msg, context);
-// log.debug("Pusher sends message : {}", msg);
+ if (log.isTraceEnabled()) {
+ log.trace("Pusher sends message : {}", msg);
+ }
size += msg.getLength();
} catch (IOException e) {
e.printStackTrace();
log.error("Exception in sending message ({}) : {}", msg, e);
}
}
+
sw.flush();
queue.logSentData(current_time, size);
}
@@ -269,7 +330,6 @@
this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
- flowManager = modContext.getServiceImpl(IFlowService.class);
if (damper != null) {
messageDamper = damper;
@@ -374,7 +434,9 @@
if (rate > 0) {
log.debug("rate for {} is set to {}", sw.getId(), rate);
- queue.max_rate = rate;
+ synchronized (queue) {
+ queue.max_rate = rate;
+ }
}
}
@@ -388,9 +450,7 @@
FlowPusherThread proc = getProcess(sw);
queue = new SwitchQueue();
queue.state = QueueState.READY;
- synchronized (proc.queues) {
- proc.queues.put(sw, queue);
- }
+ proc.queues.put(sw, queue);
return true;
}
@@ -405,11 +465,9 @@
FlowPusherThread proc = getProcess(sw);
if (forceStop) {
- synchronized (proc.queues) {
- SwitchQueue queue = proc.queues.remove(sw);
- if (queue == null) {
- return false;
- }
+ SwitchQueue queue = proc.queues.remove(sw);
+ if (queue == null) {
+ return false;
}
return true;
} else {
@@ -426,48 +484,16 @@
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
- FlowPusherThread proc = getProcess(sw);
- SwitchQueue queue = proc.queues.get(sw);
-
- // create queue at first addition of message
- if (queue == null) {
- createQueue(sw);
- queue = getQueue(sw);
- }
-
- synchronized (queue) {
- queue.add(msg);
-// log.debug("Message is pushed : {}", msg);
- }
-
- if (proc.mutex.availablePermits() == 0) {
- proc.mutex.release();
- }
-
- return true;
+ return addMessageImpl(sw, msg);
}
-
+
@Override
public void pushFlowEntries(
Collection<Pair<IOFSwitch, FlowEntry>> entries) {
- List<Pair<IOFSwitch, FlowEntry>> pushedEntries =
- new LinkedList<Pair<IOFSwitch, FlowEntry>>();
-
for (Pair<IOFSwitch, FlowEntry> entry : entries) {
- if (add(entry.first, entry.second)) {
- pushedEntries.add(entry);
- }
+ add(entry.first, entry.second);
}
-
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- // Only after inform the Flow Manager that the entry is pushed.
- //
- flowManager.flowEntriesPushedToSwitch(pushedEntries);
}
@Override
@@ -755,7 +781,39 @@
, actionOutputPort
);
- return add(sw, fm);
+ return addMessageImpl(sw, fm);
+ }
+
+ /**
+ * Add message to queue
+ * @param sw
+ * @param msg
+ * @param flowEntryId
+ * @return
+ */
+ protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg) {
+ FlowPusherThread proc = getProcess(sw);
+ SwitchQueue queue = proc.queues.get(sw);
+
+ // create queue at first addition of message
+ if (queue == null) {
+ createQueue(sw);
+ queue = getQueue(sw);
+ }
+
+ SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+ synchronized (queue) {
+ queue.add(entry);
+ if (log.isTraceEnabled()) {
+ log.trace("Message is pushed : {}", entry.getOFMessage());
+ }
+ }
+
+ if (proc.mutex.availablePermits() == 0) {
+ proc.mutex.release();
+ }
+
+ return true;
}
@Override
@@ -786,24 +844,23 @@
return null;
}
- OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
- msg.setXid(sw.getNextTransactionId());
-
- OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
- synchronized (barrierFutures) {
- Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
- if (map == null) {
- map = new HashMap<Integer,OFBarrierReplyFuture>();
- barrierFutures.put(sw.getId(), map);
- }
- map.put(msg.getXid(), future);
- }
+ OFBarrierRequest msg = createBarrierRequest(sw);
- add(sw, msg);
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+ barrierFutures.put(BarrierInfo.create(sw,msg), future);
+
+ addMessageImpl(sw, msg);
return future;
}
+ protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+ msg.setXid(sw.getNextTransactionId());
+
+ return msg;
+ }
+
/**
* Get a queue attached to a switch.
* @param sw Switch object
@@ -838,7 +895,7 @@
return threadMap.get(hash);
}
-
+
@Override
public String getName() {
return "flowpusher";
@@ -856,22 +913,25 @@
@Override
public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
- Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
- if (map == null) {
- log.debug("null map for {} : {}", sw.getId(), barrierFutures);
+ if (log.isTraceEnabled()) {
+ log.trace("Received BARRIER_REPLY from : {}", sw.getId());
+ }
+
+ if (msg.getType() != OFType.BARRIER_REPLY) {
+ log.error("Unexpected reply message : {}", msg.getType());
return Command.CONTINUE;
}
- OFBarrierReplyFuture future = map.get(msg.getXid());
- if (future == null) {
- log.debug("null future for {} : {}", msg.getXid(), map);
- return Command.CONTINUE;
- }
+ OFBarrierReply reply = (OFBarrierReply) msg;
+ BarrierInfo info = BarrierInfo.create(sw,reply);
- log.debug("Received BARRIER_REPLY : {}", msg);
- future.deliverFuture(sw, msg);
+ // Deliver future if exists
+ OFBarrierReplyFuture future = barrierFutures.get(info);
+ if (future != null) {
+ future.deliverFuture(sw, msg);
+ barrierFutures.remove(info);
+ }
return Command.CONTINUE;
}
-
}