Cherry-pick from https://gerrit.onos.onlab.us/#/c/92/4 (partially).
NOTE: Abandoned the implementation about automatic-barrier function.
FlowEntryAdded event is changed to be occurred after reception of BARRIER_REPLY (ONOS-915).
Fixed a bug that the same FlowEntry is installed to switch multiple times.
Change-Id: Ibb5cdaf9a478e2697232203d9190dedb4792f108
Fixed a bug that FlowEntries are shallow-copied in FlowPusher.
Change-Id: I704d245d3660a91f5e60fd6cdf05f89b1fba6c25
Fixed a problem that FlowPusherTest fails often(ONOS-984).
Change-Id: I2a290d5dc756e43a8058c6f35372f2e2932a8d73
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 21be62c..d2af973 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
@@ -29,7 +30,6 @@
import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
import net.onrc.onos.ofcontroller.util.FlowEntry;
@@ -54,7 +54,6 @@
*/
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
- protected volatile IFlowService flowManager;
// NOTE: Below are moved from FlowManager.
// TODO: Values copied from elsewhere (class LearningSwitch).
@@ -70,6 +69,18 @@
READY,
SUSPENDED,
}
+
+ private static class SwitchQueueEntry {
+ OFMessage msg;
+
+ public SwitchQueueEntry(OFMessage msg) {
+ this.msg = msg;
+ }
+
+ public OFMessage getOFMessage() {
+ return msg;
+ }
+ }
/**
* SwitchQueue represents message queue attached to a switch.
@@ -77,7 +88,7 @@
* @author Naoki Shiota
*
*/
- private class SwitchQueue extends ArrayDeque<OFMessage> {
+ private class SwitchQueue extends ArrayDeque<SwitchQueueEntry> {
private static final long serialVersionUID = 1L;
QueueState state;
@@ -119,6 +130,52 @@
last_sent_time = current;
last_sent_size = size;
}
+ }
+
+ /**
+ * BarrierInfo holds information to specify barrier message sent to switch.
+ * @author Naoki
+ */
+ private static class BarrierInfo {
+ final long dpid;
+ final int xid;
+
+ static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
+ return new BarrierInfo(sw.getId(), req.getXid());
+ }
+
+ static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
+ return new BarrierInfo(sw.getId(), rpy.getXid());
+ }
+
+ private BarrierInfo(long dpid, int xid) {
+ this.dpid = dpid;
+ this.xid = xid;
+ }
+
+ // Auto generated code by Eclipse
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (dpid ^ (dpid >>> 32));
+ result = prime * result + xid;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ BarrierInfo other = (BarrierInfo) obj;
+ return (this.dpid == other.dpid) && (this.xid == other.xid);
+ }
+
}
@@ -130,9 +187,9 @@
// Map of threads versus dpid
private Map<Long, FlowPusherThread> threadMap = null;
- // Map of Future objects versus dpid and transaction ID.
- private Map<Long, Map<Integer, OFBarrierReplyFuture>>
- barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
+ // Map from (DPID and transaction ID) to Future objects.
+ private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
+ = new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
private int number_thread = 1;
@@ -143,7 +200,7 @@
*/
private class FlowPusherThread extends Thread {
private Map<IOFSwitch,SwitchQueue> queues
- = new HashMap<IOFSwitch,SwitchQueue>();
+ = new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
// reusable latch used for waiting for arrival of message
private Semaphore mutex = new Semaphore(0);
@@ -163,10 +220,8 @@
// for safety of concurrent access, copy all key objects
Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
- synchronized (queues) {
- for (IOFSwitch sw : queues.keySet()) {
- keys.add(sw);
- }
+ for (IOFSwitch sw : queues.keySet()) {
+ keys.add(sw);
}
for (IOFSwitch sw : keys) {
@@ -183,9 +238,7 @@
if (queue.isEmpty()) {
// remove queue if flagged to be.
if (queue.toBeDeleted) {
- synchronized (queues) {
- queues.remove(sw);
- }
+ queues.remove(sw);
}
} else {
// if some messages remains in queue, latch down
@@ -206,7 +259,7 @@
* @param max_msg Limitation of number of messages to be sent. If set to 0,
* all messages in queue will be sent.
*/
- private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
+ private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
// check sending rate and determine it to be sent or not
long current_time = System.currentTimeMillis();
long size = 0;
@@ -220,16 +273,24 @@
}
++i;
- OFMessage msg = queue.poll();
+ SwitchQueueEntry queueEntry;
+ synchronized (queue) {
+ queueEntry = queue.poll();
+ }
+
+ OFMessage msg = queueEntry.getOFMessage();
try {
messageDamper.write(sw, msg, context);
-// log.debug("Pusher sends message : {}", msg);
+ if (log.isTraceEnabled()) {
+ log.trace("Pusher sends message : {}", msg);
+ }
size += msg.getLength();
} catch (IOException e) {
e.printStackTrace();
log.error("Exception in sending message ({}) : {}", msg, e);
}
}
+
sw.flush();
queue.logSentData(current_time, size);
}
@@ -269,7 +330,6 @@
this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
- flowManager = modContext.getServiceImpl(IFlowService.class);
if (damper != null) {
messageDamper = damper;
@@ -374,7 +434,9 @@
if (rate > 0) {
log.debug("rate for {} is set to {}", sw.getId(), rate);
- queue.max_rate = rate;
+ synchronized (queue) {
+ queue.max_rate = rate;
+ }
}
}
@@ -388,9 +450,7 @@
FlowPusherThread proc = getProcess(sw);
queue = new SwitchQueue();
queue.state = QueueState.READY;
- synchronized (proc.queues) {
- proc.queues.put(sw, queue);
- }
+ proc.queues.put(sw, queue);
return true;
}
@@ -405,11 +465,9 @@
FlowPusherThread proc = getProcess(sw);
if (forceStop) {
- synchronized (proc.queues) {
- SwitchQueue queue = proc.queues.remove(sw);
- if (queue == null) {
- return false;
- }
+ SwitchQueue queue = proc.queues.remove(sw);
+ if (queue == null) {
+ return false;
}
return true;
} else {
@@ -426,48 +484,16 @@
@Override
public boolean add(IOFSwitch sw, OFMessage msg) {
- FlowPusherThread proc = getProcess(sw);
- SwitchQueue queue = proc.queues.get(sw);
-
- // create queue at first addition of message
- if (queue == null) {
- createQueue(sw);
- queue = getQueue(sw);
- }
-
- synchronized (queue) {
- queue.add(msg);
-// log.debug("Message is pushed : {}", msg);
- }
-
- if (proc.mutex.availablePermits() == 0) {
- proc.mutex.release();
- }
-
- return true;
+ return addMessageImpl(sw, msg);
}
-
+
@Override
public void pushFlowEntries(
Collection<Pair<IOFSwitch, FlowEntry>> entries) {
- List<Pair<IOFSwitch, FlowEntry>> pushedEntries =
- new LinkedList<Pair<IOFSwitch, FlowEntry>>();
-
for (Pair<IOFSwitch, FlowEntry> entry : entries) {
- if (add(entry.first, entry.second)) {
- pushedEntries.add(entry);
- }
+ add(entry.first, entry.second);
}
-
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- // Only after inform the Flow Manager that the entry is pushed.
- //
- flowManager.flowEntriesPushedToSwitch(pushedEntries);
}
@Override
@@ -755,7 +781,39 @@
, actionOutputPort
);
- return add(sw, fm);
+ return addMessageImpl(sw, fm);
+ }
+
+ /**
+ * Add message to queue
+ * @param sw
+ * @param msg
+ * @param flowEntryId
+ * @return
+ */
+ protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg) {
+ FlowPusherThread proc = getProcess(sw);
+ SwitchQueue queue = proc.queues.get(sw);
+
+ // create queue at first addition of message
+ if (queue == null) {
+ createQueue(sw);
+ queue = getQueue(sw);
+ }
+
+ SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+ synchronized (queue) {
+ queue.add(entry);
+ if (log.isTraceEnabled()) {
+ log.trace("Message is pushed : {}", entry.getOFMessage());
+ }
+ }
+
+ if (proc.mutex.availablePermits() == 0) {
+ proc.mutex.release();
+ }
+
+ return true;
}
@Override
@@ -786,24 +844,23 @@
return null;
}
- OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
- msg.setXid(sw.getNextTransactionId());
-
- OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
- synchronized (barrierFutures) {
- Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
- if (map == null) {
- map = new HashMap<Integer,OFBarrierReplyFuture>();
- barrierFutures.put(sw.getId(), map);
- }
- map.put(msg.getXid(), future);
- }
+ OFBarrierRequest msg = createBarrierRequest(sw);
- add(sw, msg);
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+ barrierFutures.put(BarrierInfo.create(sw,msg), future);
+
+ addMessageImpl(sw, msg);
return future;
}
+ protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+ msg.setXid(sw.getNextTransactionId());
+
+ return msg;
+ }
+
/**
* Get a queue attached to a switch.
* @param sw Switch object
@@ -838,7 +895,7 @@
return threadMap.get(hash);
}
-
+
@Override
public String getName() {
return "flowpusher";
@@ -856,22 +913,25 @@
@Override
public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
- Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
- if (map == null) {
- log.debug("null map for {} : {}", sw.getId(), barrierFutures);
+ if (log.isTraceEnabled()) {
+ log.trace("Received BARRIER_REPLY from : {}", sw.getId());
+ }
+
+ if (msg.getType() != OFType.BARRIER_REPLY) {
+ log.error("Unexpected reply message : {}", msg.getType());
return Command.CONTINUE;
}
- OFBarrierReplyFuture future = map.get(msg.getXid());
- if (future == null) {
- log.debug("null future for {} : {}", msg.getXid(), map);
- return Command.CONTINUE;
- }
+ OFBarrierReply reply = (OFBarrierReply) msg;
+ BarrierInfo info = BarrierInfo.create(sw,reply);
- log.debug("Received BARRIER_REPLY : {}", msg);
- future.deliverFuture(sw, msg);
+ // Deliver future if exists
+ OFBarrierReplyFuture future = barrierFutures.get(info);
+ if (future != null) {
+ future.deliverFuture(sw, msg);
+ barrierFutures.remove(info);
+ }
return Command.CONTINUE;
}
-
}
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
index 4779b75..ae61707 100644
--- a/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
@@ -4,7 +4,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -17,7 +16,6 @@
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryActions;
@@ -31,6 +29,7 @@
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Test;
+import org.openflow.protocol.OFBarrierReply;
import org.openflow.protocol.OFBarrierRequest;
import org.openflow.protocol.OFFlowMod;
import org.openflow.protocol.OFMatch;
@@ -47,7 +46,6 @@
private OFMessageDamper damper;
private IFloodlightProviderService flProviderService;
private IThreadPoolService threadPoolService;
- private IFlowService flowService;
/**
* Test single OFMessage is correctly sent to single switch via MessageDamper.
@@ -86,9 +84,9 @@
} catch (InterruptedException e) {
fail("Failed in Thread.sleep()");
}
-
EasyMock.verify(msg);
EasyMock.verify(sw);
+ verifyAll();
pusher.stop();
}
@@ -135,7 +133,7 @@
try {
// wait until message is processed.
- Thread.sleep(1000);
+ Thread.sleep(5000);
} catch (InterruptedException e) {
fail("Failed in Thread.sleep()");
}
@@ -144,6 +142,7 @@
EasyMock.verify(msg);
}
EasyMock.verify(sw);
+ verifyAll();
pusher.stop();
}
@@ -209,7 +208,8 @@
EasyMock.verify(sw);
}
-
+ verifyAll();
+
pusher.stop();
}
@@ -274,7 +274,8 @@
EasyMock.verify(sw);
}
-
+ verifyAll();
+
pusher.stop();
}
@@ -365,7 +366,8 @@
EasyMock.verify(msg);
}
EasyMock.verify(sw);
-
+ verifyAll();
+
pusher.stop();
}
@@ -396,9 +398,20 @@
OFBarrierReplyFuture future = pusher.barrierAsync(sw);
assertNotNull(future);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Failed to sleep");
+ }
+
+ verifyAll();
+
pusher.stop();
}
+ static final int XID_TO_VERIFY = 100;
+ static final long DPID_TO_VERIFY = 10;
/**
* Test FlowObject is correctly converted to message and is sent to a switch.
*/
@@ -409,7 +422,7 @@
// instantiate required objects
FlowEntry flowEntry1 = new FlowEntry();
- flowEntry1.setDpid(new Dpid(1));
+ flowEntry1.setDpid(new Dpid(DPID_TO_VERIFY));
flowEntry1.setFlowId(new FlowId(1));
flowEntry1.setInPort(new Port((short) 1));
flowEntry1.setOutPort(new Port((short) 11));
@@ -432,25 +445,18 @@
EasyMock.expect(msg.setActions((List<OFAction>)EasyMock.anyObject())).andReturn(msg);
EasyMock.expect(msg.setLengthU(EasyMock.anyShort())).andReturn(msg);
EasyMock.expect(msg.setOutPort(EasyMock.anyShort())).andReturn(msg).atLeastOnce();
- EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
+ EasyMock.expect(msg.getXid()).andReturn(XID_TO_VERIFY).anyTimes();
EasyMock.expect(msg.getType()).andReturn(OFType.FLOW_MOD).anyTimes();
EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
EasyMock.replay(msg);
EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.FLOW_MOD))).andReturn(msg);
-
- ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
- EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
- (TimeUnit)EasyMock.anyObject())).andReturn(null).once();
- EasyMock.replay(executor);
- EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor);
IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
- EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+ EasyMock.expect(sw.getId()).andReturn(DPID_TO_VERIFY).anyTimes();
EasyMock.expect(sw.getStringId()).andReturn("1").anyTimes();
sw.flush();
EasyMock.expectLastCall().once();
- EasyMock.replay(sw);
try {
EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.anyObject(OFMessage.class), EasyMock.eq(context)))
@@ -458,14 +464,18 @@
@Override
public Boolean answer() throws Throwable {
OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
- assertEquals(msg.getType(), OFType.FLOW_MOD);
+ if (msg.getType() == OFType.FLOW_MOD) {
+ assertEquals(msg.getXid(), XID_TO_VERIFY);
+ }
return true;
}
- }).once();
+ }).atLeastOnce();
} catch (IOException e1) {
fail("Failed in OFMessageDamper#write()");
}
+ EasyMock.replay(sw);
+
endInitMock();
initPusher(1);
@@ -478,11 +488,11 @@
}
EasyMock.verify(sw);
-
+ verifyAll();
+
pusher.stop();
}
- @SuppressWarnings("unchecked")
private void beginInitMock() {
context = EasyMock.createMock(FloodlightContext.class);
modContext = EasyMock.createMock(FloodlightModuleContext.class);
@@ -490,24 +500,23 @@
damper = EasyMock.createMock(OFMessageDamper.class);
flProviderService = EasyMock.createMock(IFloodlightProviderService.class);
threadPoolService = EasyMock.createMock(IThreadPoolService.class);
- flowService = EasyMock.createMock(IFlowService.class);
-
- flowService.flowEntriesPushedToSwitch(EasyMock.anyObject(Collection.class));
- EasyMock.expectLastCall().anyTimes();
EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IThreadPoolService.class)))
.andReturn(threadPoolService).once();
EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IFloodlightProviderService.class)))
.andReturn(flProviderService).once();
- EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IFlowService.class)))
- .andReturn(flowService).once();
flProviderService.addOFMessageListener(EasyMock.eq(OFType.BARRIER_REPLY),
(FlowPusher) EasyMock.anyObject());
EasyMock.expectLastCall().once();
+
+ ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
+ EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
+ (TimeUnit)EasyMock.anyObject())).andReturn(null).once();
+ EasyMock.replay(executor);
+ EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor).anyTimes();
}
private void endInitMock() {
- EasyMock.replay(flowService);
EasyMock.replay(threadPoolService);
EasyMock.replay(flProviderService);
EasyMock.replay(damper);
@@ -516,6 +525,15 @@
EasyMock.replay(context);
}
+ private void verifyAll() {
+ EasyMock.verify(threadPoolService);
+ EasyMock.verify(flProviderService);
+ EasyMock.verify(damper);
+ EasyMock.verify(factory);
+ EasyMock.verify(modContext);
+ EasyMock.verify(context);
+ }
+
private void initPusher(int num_thread) {
pusher = new FlowPusher(num_thread);
pusher.init(context, modContext, factory, damper);
@@ -530,14 +548,7 @@
EasyMock.expect(req.getType()).andReturn(OFType.BARRIER_REQUEST).anyTimes();
EasyMock.expect(req.getLength()).andReturn((short)100).anyTimes();
EasyMock.replay(req);
- EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.BARRIER_REQUEST))).andReturn(req);
-
- ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
- EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
- (TimeUnit)EasyMock.anyObject())).andReturn(null).once();
- EasyMock.replay(executor);
- EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor);
-
+ EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.BARRIER_REQUEST))).andReturn(req).anyTimes();
EasyMock.expect(sw.getNextTransactionId()).andReturn(1);
}