blob: 4779b756662491e785bdcef0c170d4084edd03fe [file] [log] [blame]
package net.onrc.onos.ofcontroller.flowprogrammer;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.util.OFMessageDamper;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryActions;
import net.onrc.onos.ofcontroller.util.FlowEntryErrorState;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.Port;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Test;
import org.openflow.protocol.OFBarrierRequest;
import org.openflow.protocol.OFFlowMod;
import org.openflow.protocol.OFMatch;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFType;
import org.openflow.protocol.action.OFAction;
import org.openflow.protocol.factory.BasicFactory;
public class FlowPusherTest {
private FlowPusher pusher;
private FloodlightContext context;
private FloodlightModuleContext modContext;
private BasicFactory factory;
private OFMessageDamper damper;
private IFloodlightProviderService flProviderService;
private IThreadPoolService threadPoolService;
private IFlowService flowService;
/**
* Test single OFMessage is correctly sent to single switch via MessageDamper.
*/
@Test
public void testAddMessage() {
beginInitMock();
OFMessage msg = EasyMock.createMock(OFMessage.class);
EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
EasyMock.replay(msg);
IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
sw.flush();
EasyMock.expectLastCall().once();
EasyMock.replay(sw);
try {
EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
.andReturn(true).once();
} catch (IOException e1) {
fail("Failed in OFMessageDamper#write()");
}
endInitMock();
initPusher(1);
boolean add_result = pusher.add(sw, msg);
assertTrue(add_result);
try {
// wait until message is processed.
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("Failed in Thread.sleep()");
}
EasyMock.verify(msg);
EasyMock.verify(sw);
pusher.stop();
}
/**
* Test bunch of OFMessages are correctly sent to single switch via MessageDamper.
*/
@Test
public void testMassiveAddMessage() {
final int NUM_MSG = 10000;
beginInitMock();
IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
sw.flush();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(sw);
List<OFMessage> messages = new ArrayList<OFMessage>();
for (int i = 0; i < NUM_MSG; ++i) {
OFMessage msg = EasyMock.createMock(OFMessage.class);
EasyMock.expect(msg.getXid()).andReturn(i).anyTimes();
EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
EasyMock.replay(msg);
messages.add(msg);
try {
EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
.andReturn(true).once();
} catch (IOException e1) {
fail("Failed in OFMessageDamper#write()");
}
}
endInitMock();
initPusher(1);
for (OFMessage msg : messages) {
boolean add_result = pusher.add(sw, msg);
assertTrue(add_result);
}
try {
// wait until message is processed.
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("Failed in Thread.sleep()");
}
for (OFMessage msg : messages) {
EasyMock.verify(msg);
}
EasyMock.verify(sw);
pusher.stop();
}
/**
* Test bunch of OFMessages are correctly sent to multiple switches with single threads.
*/
@Test
public void testMultiSwitchAddMessage() {
final int NUM_SWITCH = 10;
final int NUM_MSG = 100; // messages per thread
beginInitMock();
Map<IOFSwitch, List<OFMessage>> sw_map = new HashMap<IOFSwitch, List<OFMessage>>();
for (int i = 0; i < NUM_SWITCH; ++i) {
IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
EasyMock.expect(sw.getId()).andReturn((long)i).anyTimes();
sw.flush();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(sw);
List<OFMessage> messages = new ArrayList<OFMessage>();
for (int j = 0; j < NUM_MSG; ++j) {
OFMessage msg = EasyMock.createMock(OFMessage.class);
EasyMock.expect(msg.getXid()).andReturn(j).anyTimes();
EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
EasyMock.replay(msg);
messages.add(msg);
try {
EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
.andReturn(true).once();
} catch (IOException e1) {
fail("Failed in OFMessageDamper#write()");
}
}
sw_map.put(sw, messages);
}
endInitMock();
initPusher(1);
for (IOFSwitch sw : sw_map.keySet()) {
for (OFMessage msg : sw_map.get(sw)) {
boolean add_result = pusher.add(sw, msg);
assertTrue(add_result);
}
}
try {
// wait until message is processed.
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("Failed in Thread.sleep()");
}
for (IOFSwitch sw : sw_map.keySet()) {
for (OFMessage msg : sw_map.get(sw)) {
EasyMock.verify(msg);
}
EasyMock.verify(sw);
}
pusher.stop();
}
/**
* Test bunch of OFMessages are correctly sent to multiple switches using multiple threads.
*/
@Test
public void testMultiThreadedAddMessage() {
final int NUM_THREAD = 10;
final int NUM_MSG = 100; // messages per thread
beginInitMock();
Map<IOFSwitch, List<OFMessage>> sw_map = new HashMap<IOFSwitch, List<OFMessage>>();
for (int i = 0; i < NUM_THREAD; ++i) {
IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
EasyMock.expect(sw.getId()).andReturn((long)i).anyTimes();
sw.flush();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(sw);
List<OFMessage> messages = new ArrayList<OFMessage>();
for (int j = 0; j < NUM_MSG; ++j) {
OFMessage msg = EasyMock.createMock(OFMessage.class);
EasyMock.expect(msg.getXid()).andReturn(j).anyTimes();
EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
EasyMock.replay(msg);
messages.add(msg);
try {
EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
.andReturn(true).once();
} catch (IOException e1) {
fail("Failed in OFMessageDamper#write()");
}
}
sw_map.put(sw, messages);
}
endInitMock();
initPusher(NUM_THREAD);
for (IOFSwitch sw : sw_map.keySet()) {
for (OFMessage msg : sw_map.get(sw)) {
boolean add_result = pusher.add(sw, msg);
assertTrue(add_result);
}
}
try {
// wait until message is processed.
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("Failed in Thread.sleep()");
}
for (IOFSwitch sw : sw_map.keySet()) {
for (OFMessage msg : sw_map.get(sw)) {
EasyMock.verify(msg);
}
EasyMock.verify(sw);
}
pusher.stop();
}
private long barrierTime = 0;
/**
* Test rate limitation of messages works correctly.
*/
@Test
public void testRateLimitedAddMessage() {
final long LIMIT_RATE = 100; // [bytes/ms]
final int NUM_MSG = 1000;
// Accuracy of FlowPusher's rate calculation can't be measured by unit test
// because switch doesn't return BARRIER_REPLY.
// In unit test we use approximate way to measure rate. This value is
// acceptable margin of measured rate.
final double ACCEPTABLE_RATE = LIMIT_RATE * 1.2;
beginInitMock();
IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
sw.flush();
EasyMock.expectLastCall().atLeastOnce();
prepareBarrier(sw);
EasyMock.replay(sw);
List<OFMessage> messages = new ArrayList<OFMessage>();
for (int i = 0; i < NUM_MSG; ++i) {
OFMessage msg = EasyMock.createMock(OFMessage.class);
EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
EasyMock.expect(msg.getLengthU()).andReturn(100).anyTimes();
EasyMock.replay(msg);
messages.add(msg);
try {
EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.eq(msg), EasyMock.eq(context)))
.andReturn(true).once();
} catch (IOException e) {
fail("Failed in OFMessageDamper#write()");
}
}
try {
EasyMock.expect(damper.write(EasyMock.eq(sw), (OFMessage)EasyMock.anyObject(), EasyMock.eq(context)))
.andAnswer(new IAnswer<Boolean>() {
@Override
public Boolean answer() throws Throwable {
OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
if (msg.getType() == OFType.BARRIER_REQUEST) {
barrierTime = System.currentTimeMillis();
}
return true;
}
}).once();
} catch (IOException e1) {
fail("Failed in OFMessageDamper#write()");
}
endInitMock();
initPusher(1);
pusher.createQueue(sw);
pusher.setRate(sw, LIMIT_RATE);
long beginTime = System.currentTimeMillis();
for (OFMessage msg : messages) {
boolean add_result = pusher.add(sw, msg);
assertTrue(add_result);
}
pusher.barrierAsync(sw);
try {
do {
Thread.sleep(1000);
} while (barrierTime == 0);
} catch (InterruptedException e) {
fail("Failed to sleep");
}
double measured_rate = NUM_MSG * 100 / (barrierTime - beginTime);
assertTrue(measured_rate < ACCEPTABLE_RATE);
for (OFMessage msg : messages) {
EasyMock.verify(msg);
}
EasyMock.verify(sw);
pusher.stop();
}
/**
* Test barrier message is correctly sent to a switch.
*/
@Test
public void testBarrierMessage() {
beginInitMock();
IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
sw.flush();
EasyMock.expectLastCall().atLeastOnce();
prepareBarrier(sw);
EasyMock.replay(sw);
try {
EasyMock.expect(damper.write(EasyMock.eq(sw), (OFMessage)EasyMock.anyObject(), EasyMock.eq(context)))
.andReturn(true).once();
} catch (IOException e1) {
fail("Failed in OFMessageDamper#write()");
}
endInitMock();
initPusher(1);
OFBarrierReplyFuture future = pusher.barrierAsync(sw);
assertNotNull(future);
pusher.stop();
}
/**
* Test FlowObject is correctly converted to message and is sent to a switch.
*/
@SuppressWarnings("unchecked")
@Test
public void testAddFlow() {
// Code below are copied from FlowManagerTest
// instantiate required objects
FlowEntry flowEntry1 = new FlowEntry();
flowEntry1.setDpid(new Dpid(1));
flowEntry1.setFlowId(new FlowId(1));
flowEntry1.setInPort(new Port((short) 1));
flowEntry1.setOutPort(new Port((short) 11));
flowEntry1.setFlowEntryId(new FlowEntryId(1));
flowEntry1.setFlowEntryMatch(new FlowEntryMatch());
flowEntry1.setFlowEntryActions(new FlowEntryActions());
flowEntry1.setFlowEntryErrorState(new FlowEntryErrorState());
flowEntry1.setFlowEntryUserState(FlowEntryUserState.FE_USER_ADD);
beginInitMock();
OFFlowMod msg = EasyMock.createMock(OFFlowMod.class);
EasyMock.expect(msg.setIdleTimeout(EasyMock.anyShort())).andReturn(msg);
EasyMock.expect(msg.setHardTimeout(EasyMock.anyShort())).andReturn(msg);
EasyMock.expect(msg.setPriority(EasyMock.anyShort())).andReturn(msg);
EasyMock.expect(msg.setBufferId(EasyMock.anyInt())).andReturn(msg);
EasyMock.expect(msg.setCookie(EasyMock.anyLong())).andReturn(msg);
EasyMock.expect(msg.setCommand(EasyMock.anyShort())).andReturn(msg);
EasyMock.expect(msg.setMatch(EasyMock.anyObject(OFMatch.class))).andReturn(msg);
EasyMock.expect(msg.setActions((List<OFAction>)EasyMock.anyObject())).andReturn(msg);
EasyMock.expect(msg.setLengthU(EasyMock.anyShort())).andReturn(msg);
EasyMock.expect(msg.setOutPort(EasyMock.anyShort())).andReturn(msg).atLeastOnce();
EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
EasyMock.expect(msg.getType()).andReturn(OFType.FLOW_MOD).anyTimes();
EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
EasyMock.replay(msg);
EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.FLOW_MOD))).andReturn(msg);
ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
(TimeUnit)EasyMock.anyObject())).andReturn(null).once();
EasyMock.replay(executor);
EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor);
IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
EasyMock.expect(sw.getStringId()).andReturn("1").anyTimes();
sw.flush();
EasyMock.expectLastCall().once();
EasyMock.replay(sw);
try {
EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.anyObject(OFMessage.class), EasyMock.eq(context)))
.andAnswer(new IAnswer<Boolean>() {
@Override
public Boolean answer() throws Throwable {
OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
assertEquals(msg.getType(), OFType.FLOW_MOD);
return true;
}
}).once();
} catch (IOException e1) {
fail("Failed in OFMessageDamper#write()");
}
endInitMock();
initPusher(1);
pusher.pushFlowEntry(sw, flowEntry1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("Failed to sleep");
}
EasyMock.verify(sw);
pusher.stop();
}
@SuppressWarnings("unchecked")
private void beginInitMock() {
context = EasyMock.createMock(FloodlightContext.class);
modContext = EasyMock.createMock(FloodlightModuleContext.class);
factory = EasyMock.createMock(BasicFactory.class);
damper = EasyMock.createMock(OFMessageDamper.class);
flProviderService = EasyMock.createMock(IFloodlightProviderService.class);
threadPoolService = EasyMock.createMock(IThreadPoolService.class);
flowService = EasyMock.createMock(IFlowService.class);
flowService.flowEntriesPushedToSwitch(EasyMock.anyObject(Collection.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IThreadPoolService.class)))
.andReturn(threadPoolService).once();
EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IFloodlightProviderService.class)))
.andReturn(flProviderService).once();
EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IFlowService.class)))
.andReturn(flowService).once();
flProviderService.addOFMessageListener(EasyMock.eq(OFType.BARRIER_REPLY),
(FlowPusher) EasyMock.anyObject());
EasyMock.expectLastCall().once();
}
private void endInitMock() {
EasyMock.replay(flowService);
EasyMock.replay(threadPoolService);
EasyMock.replay(flProviderService);
EasyMock.replay(damper);
EasyMock.replay(factory);
EasyMock.replay(modContext);
EasyMock.replay(context);
}
private void initPusher(int num_thread) {
pusher = new FlowPusher(num_thread);
pusher.init(context, modContext, factory, damper);
pusher.start();
}
private void prepareBarrier(IOFSwitch sw) {
OFBarrierRequest req = EasyMock.createMock(OFBarrierRequest.class);
req.setXid(EasyMock.anyInt());
EasyMock.expectLastCall().once();
EasyMock.expect(req.getXid()).andReturn(1).anyTimes();
EasyMock.expect(req.getType()).andReturn(OFType.BARRIER_REQUEST).anyTimes();
EasyMock.expect(req.getLength()).andReturn((short)100).anyTimes();
EasyMock.replay(req);
EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.BARRIER_REQUEST))).andReturn(req);
ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
(TimeUnit)EasyMock.anyObject())).andReturn(null).once();
EasyMock.replay(executor);
EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor);
EasyMock.expect(sw.getNextTransactionId()).andReturn(1);
}
}