blob: 042782804e7772b6a805ca00bf6db4fc279820a5 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001package net.floodlightcontroller.flowcache;
2
3import static org.easymock.EasyMock.*;
4
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.ListIterator;
8
9import net.floodlightcontroller.core.IListener.Command;
10import net.floodlightcontroller.core.module.FloodlightModuleContext;
11import net.floodlightcontroller.core.test.MockFloodlightProvider;
12import net.floodlightcontroller.core.test.MockThreadPoolService;
13import net.floodlightcontroller.counter.ICounterStoreService;
14import net.floodlightcontroller.counter.SimpleCounter;
15import net.floodlightcontroller.counter.CounterValue.CounterType;
16import net.floodlightcontroller.flowcache.IFlowReconcileListener;
17import net.floodlightcontroller.flowcache.OFMatchReconcile;
18import net.floodlightcontroller.test.FloodlightTestCase;
19import net.floodlightcontroller.threadpool.IThreadPoolService;
20
21import org.easymock.EasyMock;
22import org.easymock.IAnswer;
23import org.junit.Before;
24import org.junit.Test;
25import org.openflow.protocol.OFStatisticsRequest;
26import org.openflow.protocol.OFType;
27
28public class FlowReconcileMgrTest extends FloodlightTestCase {
29
30 protected MockFloodlightProvider mockFloodlightProvider;
31 protected FlowReconcileManager flowReconcileMgr;
32 protected MockThreadPoolService threadPool;
33 protected ICounterStoreService counterStore;
34 protected FloodlightModuleContext fmc;
35
36 OFStatisticsRequest ofStatsRequest;
37
38 protected int NUM_FLOWS_PER_THREAD = 100;
39 protected int NUM_THREADS = 100;
40
41 @Before
42 public void setUp() throws Exception {
43 super.setUp();
44
45 fmc = new FloodlightModuleContext();
46 flowReconcileMgr = new FlowReconcileManager();
47 threadPool = new MockThreadPoolService();
48 counterStore = createMock(ICounterStoreService.class);
49
50 fmc.addService(ICounterStoreService.class, counterStore);
51 fmc.addService(IThreadPoolService.class, threadPool);
52
53 threadPool.init(fmc);
54 flowReconcileMgr.init(fmc);
55
56 threadPool.startUp(fmc);
57 flowReconcileMgr.startUp(fmc);
58 }
59
60 /** Verify pipeline listener registration and ordering
61 *
62 * @throws Exception
63 */
64 @SuppressWarnings("unchecked")
65 @Test
66 public void testFlowReconcilePipeLine() throws Exception {
67 flowReconcileMgr.flowReconcileEnabled = true;
68
69 IFlowReconcileListener r1 =
70 EasyMock.createNiceMock(IFlowReconcileListener.class);
71 IFlowReconcileListener r2 =
72 EasyMock.createNiceMock(IFlowReconcileListener.class);
73 IFlowReconcileListener r3 =
74 EasyMock.createNiceMock(IFlowReconcileListener.class);
75
76 expect(r1.getName()).andReturn("r1").anyTimes();
77 expect(r2.getName()).andReturn("r2").anyTimes();
78 expect(r3.getName()).andReturn("r3").anyTimes();
79
80 // Set the listeners' order: r1 -> r2 -> r3
81 expect(r1.isCallbackOrderingPrereq((OFType)anyObject(),
82 (String)anyObject())).andReturn(false).anyTimes();
83 expect(r1.isCallbackOrderingPostreq((OFType)anyObject(),
84 (String)anyObject())).andReturn(false).anyTimes();
85 expect(r2.isCallbackOrderingPrereq((OFType)anyObject(),
86 eq("r1"))).andReturn(true).anyTimes();
87 expect(r2.isCallbackOrderingPrereq((OFType)anyObject(),
88 eq("r3"))).andReturn(false).anyTimes();
89 expect(r2.isCallbackOrderingPostreq((OFType)anyObject(),
90 eq("r1"))).andReturn(false).anyTimes();
91 expect(r2.isCallbackOrderingPostreq((OFType)anyObject(),
92 eq("r3"))).andReturn(true).anyTimes();
93 expect(r3.isCallbackOrderingPrereq((OFType)anyObject(),
94 eq("r1"))).andReturn(false).anyTimes();
95 expect(r3.isCallbackOrderingPrereq((OFType)anyObject(),
96 eq("r2"))).andReturn(true).anyTimes();
97 expect(r3.isCallbackOrderingPostreq((OFType)anyObject(),
98 (String)anyObject())).andReturn(false).anyTimes();
99
100 expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())).
101 andThrow(new RuntimeException("This is NOT an error! " +
102 "We are testing exception catching."));
103
104 SimpleCounter cnt = (SimpleCounter)SimpleCounter.createCounter(
105 new Date(),
106 CounterType.LONG);
107 cnt.increment();
108 expect(counterStore.getCounter(
109 flowReconcileMgr.controllerPktInCounterName))
110 .andReturn(cnt)
111 .anyTimes();
112
113 replay(r1, r2, r3, counterStore);
114 flowReconcileMgr.clearFlowReconcileListeners();
115 flowReconcileMgr.addFlowReconcileListener(r1);
116 flowReconcileMgr.addFlowReconcileListener(r2);
117 flowReconcileMgr.addFlowReconcileListener(r3);
118
119 int pre_flowReconcileThreadRunCount =
120 flowReconcileMgr.flowReconcileThreadRunCount;
121 Date startTime = new Date();
122 OFMatchReconcile ofmRcIn = new OFMatchReconcile();
123 try {
124 flowReconcileMgr.reconcileFlow(ofmRcIn);
125 flowReconcileMgr.doReconcile();
126 } catch (RuntimeException e) {
127 assertEquals(e.getMessage()
128 .startsWith("This is NOT an error!"), true);
129 }
130
131 verify(r1, r2, r3);
132
133 // verify STOP works
134 reset(r1, r2, r3);
135
136 // restart reconcileThread since it exited due to previous runtime
137 // exception.
138 flowReconcileMgr.startUp(fmc);
139 expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
140 .andReturn(Command.STOP).times(1);
141 expect(r2.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()));
142 expectLastCall().andAnswer(new IAnswer<Object>() {
143 public Object answer() {
144 fail("Unexpected call");
145 return Command.STOP;
146 }
147 }).anyTimes();
148
149 pre_flowReconcileThreadRunCount =
150 flowReconcileMgr.flowReconcileThreadRunCount;
151 startTime = new Date();
152 replay(r1, r2, r3);
153 flowReconcileMgr.reconcileFlow(ofmRcIn);
154 while (flowReconcileMgr.flowReconcileThreadRunCount <=
155 pre_flowReconcileThreadRunCount) {
156 Thread.sleep(10);
157 Date currTime = new Date();
158 assertTrue((currTime.getTime() - startTime.getTime()) < 1000);
159 }
160 verify(r1, r2, r3);
161
162 // verify CONTINUE works
163 reset(r1, r2, r3);
164 expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
165 .andReturn(Command.CONTINUE).times(1);
166 expect(r2.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
167 .andReturn(Command.STOP).times(1);
168 expect(r3.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()));
169 expectLastCall().andAnswer(new IAnswer<Object>() {
170 public Object answer() {
171 fail("Unexpected call");
172 return Command.STOP;
173 }
174 }).anyTimes();
175
176 pre_flowReconcileThreadRunCount =
177 flowReconcileMgr.flowReconcileThreadRunCount;
178 startTime = new Date();
179
180 replay(r1, r2, r3);
181 flowReconcileMgr.reconcileFlow(ofmRcIn);
182 while (flowReconcileMgr.flowReconcileThreadRunCount <=
183 pre_flowReconcileThreadRunCount) {
184 Thread.sleep(10);
185 Date currTime = new Date();
186 assertTrue((currTime.getTime() - startTime.getTime()) < 1000);
187 }
188 verify(r1, r2, r3);
189
190 // verify CONTINUE works
191 reset(r1, r2, r3);
192 expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
193 .andReturn(Command.CONTINUE).times(1);
194 expect(r2.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
195 .andReturn(Command.CONTINUE).times(1);
196 expect(r3.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
197 .andReturn(Command.STOP).times(1);
198
199 pre_flowReconcileThreadRunCount =
200 flowReconcileMgr.flowReconcileThreadRunCount;
201 startTime = new Date();
202
203 replay(r1, r2, r3);
204 flowReconcileMgr.reconcileFlow(ofmRcIn);
205 while (flowReconcileMgr.flowReconcileThreadRunCount <=
206 pre_flowReconcileThreadRunCount) {
207 Thread.sleep(10);
208 Date currTime = new Date();
209 assertTrue((currTime.getTime() - startTime.getTime()) < 1000);
210 }
211 verify(r1, r2, r3);
212
213 // Verify removeFlowReconcileListener
214 flowReconcileMgr.removeFlowReconcileListener(r1);
215 reset(r1, r2, r3);
216 expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()));
217 expectLastCall().andAnswer(new IAnswer<Object>() {
218 public Object answer() {
219 fail("Unexpected call to a listener that is " +
220 "removed from the chain.");
221 return Command.STOP;
222 }
223 }).anyTimes();
224 expect(r2.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
225 .andReturn(Command.CONTINUE).times(1);
226 expect(r3.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
227 .andReturn(Command.STOP).times(1);
228
229 pre_flowReconcileThreadRunCount =
230 flowReconcileMgr.flowReconcileThreadRunCount;
231 startTime = new Date();
232 replay(r1, r2, r3);
233 flowReconcileMgr.reconcileFlow(ofmRcIn);
234 while (flowReconcileMgr.flowReconcileThreadRunCount <=
235 pre_flowReconcileThreadRunCount) {
236 Thread.sleep(10);
237 Date currTime = new Date();
238 assertTrue((currTime.getTime() - startTime.getTime()) < 1000);
239 }
240 verify(r1, r2, r3);
241 }
242
243 @Test
244 public void testGetPktInRate() {
245 internalTestGetPktInRate(CounterType.LONG);
246 internalTestGetPktInRate(CounterType.DOUBLE);
247 }
248
249 protected void internalTestGetPktInRate(CounterType type) {
250 Date currentTime = new Date();
251 SimpleCounter newCnt = (SimpleCounter)SimpleCounter.createCounter(
252 currentTime, type);
253 newCnt.increment(currentTime, 1);
254
255 // Set the lastCounter time in the future of the current time
256 Date lastCounterTime = new Date(currentTime.getTime() + 1000);
257 flowReconcileMgr.lastPacketInCounter =
258 (SimpleCounter)SimpleCounter.createCounter(
259 lastCounterTime, type);
260 flowReconcileMgr.lastPacketInCounter.increment(lastCounterTime, 1);
261
262 assertEquals(FlowReconcileManager.MAX_SYSTEM_LOAD_PER_SECOND,
263 flowReconcileMgr.getPktInRate(newCnt, new Date()));
264
265 // Verify the rate == 0 time difference is zero.
266 lastCounterTime = new Date(currentTime.getTime() - 1000);
267 flowReconcileMgr.lastPacketInCounter.increment(lastCounterTime, 1);
268 assertEquals(0, flowReconcileMgr.getPktInRate(newCnt, lastCounterTime));
269
270 /** verify the computation is correct.
271 * new = 2000, old = 1000, Tdiff = 1 second.
272 * rate should be 1000/second
273 */
274 newCnt = (SimpleCounter)SimpleCounter.createCounter(
275 currentTime, type);
276 newCnt.increment(currentTime, 2000);
277
278 lastCounterTime = new Date(currentTime.getTime() - 1000);
279 flowReconcileMgr.lastPacketInCounter =
280 (SimpleCounter)SimpleCounter.createCounter(
281 lastCounterTime, type);
282 flowReconcileMgr.lastPacketInCounter.increment(lastCounterTime, 1000);
283 assertEquals(1000, flowReconcileMgr.getPktInRate(newCnt, currentTime));
284
285 /** verify the computation is correct.
286 * new = 2,000,000, old = 1,000,000, Tdiff = 2 second.
287 * rate should be 1000/second
288 */
289 newCnt = (SimpleCounter)SimpleCounter.createCounter(
290 currentTime, type);
291 newCnt.increment(currentTime, 2000000);
292
293 lastCounterTime = new Date(currentTime.getTime() - 2000);
294 flowReconcileMgr.lastPacketInCounter =
295 (SimpleCounter)SimpleCounter.createCounter(
296 lastCounterTime, type);
297 flowReconcileMgr.lastPacketInCounter.increment(lastCounterTime,
298 1000000);
299 assertEquals(500000, flowReconcileMgr.getPktInRate(newCnt,
300 currentTime));
301 }
302
303 @Test
304 public void testGetCurrentCapacity() throws Exception {
305 // Disable the reconcile thread.
306 flowReconcileMgr.flowReconcileEnabled = false;
307
308 int minFlows = FlowReconcileManager.MIN_FLOW_RECONCILE_PER_SECOND *
309 FlowReconcileManager.FLOW_RECONCILE_DELAY_MILLISEC / 1000;
310
311 /** Verify the initial state, when packetIn counter has not
312 * been created.
313 */
314 expect(counterStore.getCounter(
315 flowReconcileMgr.controllerPktInCounterName))
316 .andReturn(null)
317 .times(1);
318
319 replay(counterStore);
320 assertEquals(minFlows, flowReconcileMgr.getCurrentCapacity());
321 verify(counterStore);
322
323 /** Verify the initial state, when lastPacketInCounter is null */
324 reset(counterStore);
325 Date currentTime = new Date();
326 SimpleCounter newCnt = (SimpleCounter)SimpleCounter.createCounter(
327 currentTime, CounterType.LONG);
328
329 expect(counterStore.getCounter(
330 flowReconcileMgr.controllerPktInCounterName))
331 .andReturn(newCnt)
332 .times(1);
333 long initPktInCount = 10000;
334 newCnt.increment(currentTime, initPktInCount);
335
336 replay(counterStore);
337 assertEquals(minFlows, flowReconcileMgr.getCurrentCapacity());
338 verify(counterStore);
339
340 /** Now the lastPacketInCounter has been set.
341 * lastCounter = 100,000 and newCounter = 300,000, t = 1 second
342 * packetInRate = 200,000/sec.
343 * capacity should be 500k - 200k = 300k
344 */
345 reset(counterStore);
346 newCnt = (SimpleCounter)SimpleCounter.createCounter(
347 currentTime, CounterType.LONG);
348 currentTime = new Date(currentTime.getTime() + 200);
349 long nextPktInCount = 30000;
350 newCnt.increment(currentTime, nextPktInCount);
351
352 expect(counterStore.getCounter(
353 flowReconcileMgr.controllerPktInCounterName))
354 .andReturn(newCnt)
355 .times(1);
356
357 replay(counterStore);
358 // Wait for 1 second so that enough elapsed time to compute capacity.
359 Thread.sleep(1000);
360 int capacity = flowReconcileMgr.getCurrentCapacity();
361 verify(counterStore);
362 long expectedCap = (FlowReconcileManager.MAX_SYSTEM_LOAD_PER_SECOND -
363 (nextPktInCount - initPktInCount)) *
364 FlowReconcileManager.FLOW_RECONCILE_DELAY_MILLISEC / 1000;
365 assertEquals(expectedCap, capacity);
366 }
367
368 private class FlowReconcileWorker implements Runnable {
369 @Override
370 public void run() {
371 OFMatchReconcile ofmRc = new OFMatchReconcile();
372 // push large number of flows to be reconciled.
373 for (int i = 0; i < NUM_FLOWS_PER_THREAD; i++) {
374 flowReconcileMgr.reconcileFlow(ofmRc);
375 }
376 }
377 }
378
379 /** Verify the flows are sent to the reconcile pipeline in order.
380 */
381 @SuppressWarnings("unchecked")
382 @Test
383 public void testQueueFlowsOrder() {
384 flowReconcileMgr.flowReconcileEnabled = false;
385
386 IFlowReconcileListener r1 =
387 EasyMock.createNiceMock(IFlowReconcileListener.class);
388
389 expect(r1.getName()).andReturn("r1").anyTimes();
390
391 // Set the listeners' order: r1 -> r2 -> r3
392 expect(r1.isCallbackOrderingPrereq((OFType)anyObject(),
393 (String)anyObject())).andReturn(false).anyTimes();
394 expect(r1.isCallbackOrderingPostreq((OFType)anyObject(),
395 (String)anyObject())).andReturn(false).anyTimes();
396
397 expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
398 .andAnswer(new IAnswer<Command>() {
399 @Override
400 public Command answer() throws Throwable {
401 ArrayList<OFMatchReconcile> ofmList =
402 (ArrayList<OFMatchReconcile>)EasyMock.
403 getCurrentArguments()[0];
404 ListIterator<OFMatchReconcile> lit = ofmList.listIterator();
405 int index = 0;
406 while (lit.hasNext()) {
407 OFMatchReconcile ofm = lit.next();
408 assertEquals(index++, ofm.cookie);
409 }
410 return Command.STOP;
411 }
412 }).times(1);
413
414 SimpleCounter cnt = (SimpleCounter)SimpleCounter.createCounter(
415 new Date(),
416 CounterType.LONG);
417 cnt.increment();
418 expect(counterStore.getCounter(
419 flowReconcileMgr.controllerPktInCounterName))
420 .andReturn(cnt)
421 .anyTimes();
422
423 replay(r1, counterStore);
424 flowReconcileMgr.clearFlowReconcileListeners();
425 flowReconcileMgr.addFlowReconcileListener(r1);
426
427 OFMatchReconcile ofmRcIn = new OFMatchReconcile();
428 int index = 0;
429 for (index = 0; index < 10; index++) {
430 ofmRcIn.cookie = index;
431 flowReconcileMgr.reconcileFlow(ofmRcIn);
432 }
433 flowReconcileMgr.flowReconcileEnabled = true;
434 flowReconcileMgr.doReconcile();
435
436 verify(r1);
437 }
438
439 @SuppressWarnings("unchecked")
440 @Test
441 public void testQueueFlowsByManyThreads() {
442 // Disable the reconcile thread so that the queue won't be emptied.
443 flowQueueTest(false);
444
445 // Enable the reconcile thread. The queue should be empty.
446 Date currentTime = new Date();
447 SimpleCounter newCnt = (SimpleCounter)SimpleCounter.createCounter(
448 currentTime, CounterType.LONG);
449
450 expect(counterStore.getCounter(
451 flowReconcileMgr.controllerPktInCounterName))
452 .andReturn(newCnt)
453 .anyTimes();
454 long initPktInCount = 10000;
455 newCnt.increment(currentTime, initPktInCount);
456
457 IFlowReconcileListener r1 =
458 EasyMock.createNiceMock(IFlowReconcileListener.class);
459
460 expect(r1.getName()).andReturn("r1").anyTimes();
461
462 // Set the listeners' order: r1 -> r2 -> r3
463 expect(r1.isCallbackOrderingPrereq((OFType)anyObject(),
464 (String)anyObject())).andReturn(false).anyTimes();
465 expect(r1.isCallbackOrderingPostreq((OFType)anyObject(),
466 (String)anyObject())).andReturn(false).anyTimes();
467
468 expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject()))
469 .andReturn(Command.CONTINUE).anyTimes();
470
471 flowReconcileMgr.clearFlowReconcileListeners();
472 replay(r1, counterStore);
473 flowQueueTest(true);
474 verify(r1, counterStore);
475 }
476
477 protected void flowQueueTest(boolean enableReconcileThread) {
478 flowReconcileMgr.flowReconcileEnabled = enableReconcileThread;
479
480 // Simulate flow
481 for (int i = 0; i < NUM_THREADS; i++) {
482 Runnable worker = this.new FlowReconcileWorker();
483 Thread t = new Thread(worker);
484 t.start();
485 }
486
487 Date startTime = new Date();
488 int totalFlows = NUM_THREADS * NUM_FLOWS_PER_THREAD;
489 if (enableReconcileThread) {
490 totalFlows = 0;
491 }
492 while (flowReconcileMgr.flowQueue.size() != totalFlows) {
493 Date currTime = new Date();
494 assertTrue((currTime.getTime() - startTime.getTime()) < 2000);
495 }
496
497 // Make sure all flows are in the queue.
498 assertEquals(totalFlows, flowReconcileMgr.flowQueue.size());
499 }
500}