Umesh Krishnaswamy | 345ee99 | 2012-12-13 20:29:48 -0800 | [diff] [blame] | 1 | package net.floodlightcontroller.flowcache; |
| 2 | |
| 3 | import static org.easymock.EasyMock.*; |
| 4 | |
| 5 | import java.util.ArrayList; |
| 6 | import java.util.Date; |
| 7 | import java.util.ListIterator; |
| 8 | |
| 9 | import net.floodlightcontroller.core.IListener.Command; |
| 10 | import net.floodlightcontroller.core.module.FloodlightModuleContext; |
| 11 | import net.floodlightcontroller.core.test.MockFloodlightProvider; |
| 12 | import net.floodlightcontroller.core.test.MockThreadPoolService; |
| 13 | import net.floodlightcontroller.counter.ICounterStoreService; |
| 14 | import net.floodlightcontroller.counter.SimpleCounter; |
| 15 | import net.floodlightcontroller.counter.CounterValue.CounterType; |
| 16 | import net.floodlightcontroller.flowcache.IFlowReconcileListener; |
| 17 | import net.floodlightcontroller.flowcache.OFMatchReconcile; |
| 18 | import net.floodlightcontroller.test.FloodlightTestCase; |
| 19 | import net.floodlightcontroller.threadpool.IThreadPoolService; |
| 20 | |
| 21 | import org.easymock.EasyMock; |
| 22 | import org.easymock.IAnswer; |
| 23 | import org.junit.Before; |
| 24 | import org.junit.Test; |
| 25 | import org.openflow.protocol.OFStatisticsRequest; |
| 26 | import org.openflow.protocol.OFType; |
| 27 | |
| 28 | public class FlowReconcileMgrTest extends FloodlightTestCase { |
| 29 | |
| 30 | protected MockFloodlightProvider mockFloodlightProvider; |
| 31 | protected FlowReconcileManager flowReconcileMgr; |
| 32 | protected MockThreadPoolService threadPool; |
| 33 | protected ICounterStoreService counterStore; |
| 34 | protected FloodlightModuleContext fmc; |
| 35 | |
| 36 | OFStatisticsRequest ofStatsRequest; |
| 37 | |
| 38 | protected int NUM_FLOWS_PER_THREAD = 100; |
| 39 | protected int NUM_THREADS = 100; |
| 40 | |
| 41 | @Before |
| 42 | public void setUp() throws Exception { |
| 43 | super.setUp(); |
| 44 | |
| 45 | fmc = new FloodlightModuleContext(); |
| 46 | flowReconcileMgr = new FlowReconcileManager(); |
| 47 | threadPool = new MockThreadPoolService(); |
| 48 | counterStore = createMock(ICounterStoreService.class); |
| 49 | |
| 50 | fmc.addService(ICounterStoreService.class, counterStore); |
| 51 | fmc.addService(IThreadPoolService.class, threadPool); |
| 52 | |
| 53 | threadPool.init(fmc); |
| 54 | flowReconcileMgr.init(fmc); |
| 55 | |
| 56 | threadPool.startUp(fmc); |
| 57 | flowReconcileMgr.startUp(fmc); |
| 58 | } |
| 59 | |
| 60 | /** Verify pipeline listener registration and ordering |
| 61 | * |
| 62 | * @throws Exception |
| 63 | */ |
| 64 | @SuppressWarnings("unchecked") |
| 65 | @Test |
| 66 | public void testFlowReconcilePipeLine() throws Exception { |
| 67 | flowReconcileMgr.flowReconcileEnabled = true; |
| 68 | |
| 69 | IFlowReconcileListener r1 = |
| 70 | EasyMock.createNiceMock(IFlowReconcileListener.class); |
| 71 | IFlowReconcileListener r2 = |
| 72 | EasyMock.createNiceMock(IFlowReconcileListener.class); |
| 73 | IFlowReconcileListener r3 = |
| 74 | EasyMock.createNiceMock(IFlowReconcileListener.class); |
| 75 | |
| 76 | expect(r1.getName()).andReturn("r1").anyTimes(); |
| 77 | expect(r2.getName()).andReturn("r2").anyTimes(); |
| 78 | expect(r3.getName()).andReturn("r3").anyTimes(); |
| 79 | |
| 80 | // Set the listeners' order: r1 -> r2 -> r3 |
| 81 | expect(r1.isCallbackOrderingPrereq((OFType)anyObject(), |
| 82 | (String)anyObject())).andReturn(false).anyTimes(); |
| 83 | expect(r1.isCallbackOrderingPostreq((OFType)anyObject(), |
| 84 | (String)anyObject())).andReturn(false).anyTimes(); |
| 85 | expect(r2.isCallbackOrderingPrereq((OFType)anyObject(), |
| 86 | eq("r1"))).andReturn(true).anyTimes(); |
| 87 | expect(r2.isCallbackOrderingPrereq((OFType)anyObject(), |
| 88 | eq("r3"))).andReturn(false).anyTimes(); |
| 89 | expect(r2.isCallbackOrderingPostreq((OFType)anyObject(), |
| 90 | eq("r1"))).andReturn(false).anyTimes(); |
| 91 | expect(r2.isCallbackOrderingPostreq((OFType)anyObject(), |
| 92 | eq("r3"))).andReturn(true).anyTimes(); |
| 93 | expect(r3.isCallbackOrderingPrereq((OFType)anyObject(), |
| 94 | eq("r1"))).andReturn(false).anyTimes(); |
| 95 | expect(r3.isCallbackOrderingPrereq((OFType)anyObject(), |
| 96 | eq("r2"))).andReturn(true).anyTimes(); |
| 97 | expect(r3.isCallbackOrderingPostreq((OFType)anyObject(), |
| 98 | (String)anyObject())).andReturn(false).anyTimes(); |
| 99 | |
| 100 | expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())). |
| 101 | andThrow(new RuntimeException("This is NOT an error! " + |
| 102 | "We are testing exception catching.")); |
| 103 | |
| 104 | SimpleCounter cnt = (SimpleCounter)SimpleCounter.createCounter( |
| 105 | new Date(), |
| 106 | CounterType.LONG); |
| 107 | cnt.increment(); |
| 108 | expect(counterStore.getCounter( |
| 109 | flowReconcileMgr.controllerPktInCounterName)) |
| 110 | .andReturn(cnt) |
| 111 | .anyTimes(); |
| 112 | |
| 113 | replay(r1, r2, r3, counterStore); |
| 114 | flowReconcileMgr.clearFlowReconcileListeners(); |
| 115 | flowReconcileMgr.addFlowReconcileListener(r1); |
| 116 | flowReconcileMgr.addFlowReconcileListener(r2); |
| 117 | flowReconcileMgr.addFlowReconcileListener(r3); |
| 118 | |
| 119 | int pre_flowReconcileThreadRunCount = |
| 120 | flowReconcileMgr.flowReconcileThreadRunCount; |
| 121 | Date startTime = new Date(); |
| 122 | OFMatchReconcile ofmRcIn = new OFMatchReconcile(); |
| 123 | try { |
| 124 | flowReconcileMgr.reconcileFlow(ofmRcIn); |
| 125 | flowReconcileMgr.doReconcile(); |
| 126 | } catch (RuntimeException e) { |
| 127 | assertEquals(e.getMessage() |
| 128 | .startsWith("This is NOT an error!"), true); |
| 129 | } |
| 130 | |
| 131 | verify(r1, r2, r3); |
| 132 | |
| 133 | // verify STOP works |
| 134 | reset(r1, r2, r3); |
| 135 | |
| 136 | // restart reconcileThread since it exited due to previous runtime |
| 137 | // exception. |
| 138 | flowReconcileMgr.startUp(fmc); |
| 139 | expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 140 | .andReturn(Command.STOP).times(1); |
| 141 | expect(r2.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())); |
| 142 | expectLastCall().andAnswer(new IAnswer<Object>() { |
| 143 | public Object answer() { |
| 144 | fail("Unexpected call"); |
| 145 | return Command.STOP; |
| 146 | } |
| 147 | }).anyTimes(); |
| 148 | |
| 149 | pre_flowReconcileThreadRunCount = |
| 150 | flowReconcileMgr.flowReconcileThreadRunCount; |
| 151 | startTime = new Date(); |
| 152 | replay(r1, r2, r3); |
| 153 | flowReconcileMgr.reconcileFlow(ofmRcIn); |
| 154 | while (flowReconcileMgr.flowReconcileThreadRunCount <= |
| 155 | pre_flowReconcileThreadRunCount) { |
| 156 | Thread.sleep(10); |
| 157 | Date currTime = new Date(); |
| 158 | assertTrue((currTime.getTime() - startTime.getTime()) < 1000); |
| 159 | } |
| 160 | verify(r1, r2, r3); |
| 161 | |
| 162 | // verify CONTINUE works |
| 163 | reset(r1, r2, r3); |
| 164 | expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 165 | .andReturn(Command.CONTINUE).times(1); |
| 166 | expect(r2.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 167 | .andReturn(Command.STOP).times(1); |
| 168 | expect(r3.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())); |
| 169 | expectLastCall().andAnswer(new IAnswer<Object>() { |
| 170 | public Object answer() { |
| 171 | fail("Unexpected call"); |
| 172 | return Command.STOP; |
| 173 | } |
| 174 | }).anyTimes(); |
| 175 | |
| 176 | pre_flowReconcileThreadRunCount = |
| 177 | flowReconcileMgr.flowReconcileThreadRunCount; |
| 178 | startTime = new Date(); |
| 179 | |
| 180 | replay(r1, r2, r3); |
| 181 | flowReconcileMgr.reconcileFlow(ofmRcIn); |
| 182 | while (flowReconcileMgr.flowReconcileThreadRunCount <= |
| 183 | pre_flowReconcileThreadRunCount) { |
| 184 | Thread.sleep(10); |
| 185 | Date currTime = new Date(); |
| 186 | assertTrue((currTime.getTime() - startTime.getTime()) < 1000); |
| 187 | } |
| 188 | verify(r1, r2, r3); |
| 189 | |
| 190 | // verify CONTINUE works |
| 191 | reset(r1, r2, r3); |
| 192 | expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 193 | .andReturn(Command.CONTINUE).times(1); |
| 194 | expect(r2.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 195 | .andReturn(Command.CONTINUE).times(1); |
| 196 | expect(r3.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 197 | .andReturn(Command.STOP).times(1); |
| 198 | |
| 199 | pre_flowReconcileThreadRunCount = |
| 200 | flowReconcileMgr.flowReconcileThreadRunCount; |
| 201 | startTime = new Date(); |
| 202 | |
| 203 | replay(r1, r2, r3); |
| 204 | flowReconcileMgr.reconcileFlow(ofmRcIn); |
| 205 | while (flowReconcileMgr.flowReconcileThreadRunCount <= |
| 206 | pre_flowReconcileThreadRunCount) { |
| 207 | Thread.sleep(10); |
| 208 | Date currTime = new Date(); |
| 209 | assertTrue((currTime.getTime() - startTime.getTime()) < 1000); |
| 210 | } |
| 211 | verify(r1, r2, r3); |
| 212 | |
| 213 | // Verify removeFlowReconcileListener |
| 214 | flowReconcileMgr.removeFlowReconcileListener(r1); |
| 215 | reset(r1, r2, r3); |
| 216 | expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())); |
| 217 | expectLastCall().andAnswer(new IAnswer<Object>() { |
| 218 | public Object answer() { |
| 219 | fail("Unexpected call to a listener that is " + |
| 220 | "removed from the chain."); |
| 221 | return Command.STOP; |
| 222 | } |
| 223 | }).anyTimes(); |
| 224 | expect(r2.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 225 | .andReturn(Command.CONTINUE).times(1); |
| 226 | expect(r3.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 227 | .andReturn(Command.STOP).times(1); |
| 228 | |
| 229 | pre_flowReconcileThreadRunCount = |
| 230 | flowReconcileMgr.flowReconcileThreadRunCount; |
| 231 | startTime = new Date(); |
| 232 | replay(r1, r2, r3); |
| 233 | flowReconcileMgr.reconcileFlow(ofmRcIn); |
| 234 | while (flowReconcileMgr.flowReconcileThreadRunCount <= |
| 235 | pre_flowReconcileThreadRunCount) { |
| 236 | Thread.sleep(10); |
| 237 | Date currTime = new Date(); |
| 238 | assertTrue((currTime.getTime() - startTime.getTime()) < 1000); |
| 239 | } |
| 240 | verify(r1, r2, r3); |
| 241 | } |
| 242 | |
| 243 | @Test |
| 244 | public void testGetPktInRate() { |
| 245 | internalTestGetPktInRate(CounterType.LONG); |
| 246 | internalTestGetPktInRate(CounterType.DOUBLE); |
| 247 | } |
| 248 | |
| 249 | protected void internalTestGetPktInRate(CounterType type) { |
| 250 | Date currentTime = new Date(); |
| 251 | SimpleCounter newCnt = (SimpleCounter)SimpleCounter.createCounter( |
| 252 | currentTime, type); |
| 253 | newCnt.increment(currentTime, 1); |
| 254 | |
| 255 | // Set the lastCounter time in the future of the current time |
| 256 | Date lastCounterTime = new Date(currentTime.getTime() + 1000); |
| 257 | flowReconcileMgr.lastPacketInCounter = |
| 258 | (SimpleCounter)SimpleCounter.createCounter( |
| 259 | lastCounterTime, type); |
| 260 | flowReconcileMgr.lastPacketInCounter.increment(lastCounterTime, 1); |
| 261 | |
| 262 | assertEquals(FlowReconcileManager.MAX_SYSTEM_LOAD_PER_SECOND, |
| 263 | flowReconcileMgr.getPktInRate(newCnt, new Date())); |
| 264 | |
| 265 | // Verify the rate == 0 time difference is zero. |
| 266 | lastCounterTime = new Date(currentTime.getTime() - 1000); |
| 267 | flowReconcileMgr.lastPacketInCounter.increment(lastCounterTime, 1); |
| 268 | assertEquals(0, flowReconcileMgr.getPktInRate(newCnt, lastCounterTime)); |
| 269 | |
| 270 | /** verify the computation is correct. |
| 271 | * new = 2000, old = 1000, Tdiff = 1 second. |
| 272 | * rate should be 1000/second |
| 273 | */ |
| 274 | newCnt = (SimpleCounter)SimpleCounter.createCounter( |
| 275 | currentTime, type); |
| 276 | newCnt.increment(currentTime, 2000); |
| 277 | |
| 278 | lastCounterTime = new Date(currentTime.getTime() - 1000); |
| 279 | flowReconcileMgr.lastPacketInCounter = |
| 280 | (SimpleCounter)SimpleCounter.createCounter( |
| 281 | lastCounterTime, type); |
| 282 | flowReconcileMgr.lastPacketInCounter.increment(lastCounterTime, 1000); |
| 283 | assertEquals(1000, flowReconcileMgr.getPktInRate(newCnt, currentTime)); |
| 284 | |
| 285 | /** verify the computation is correct. |
| 286 | * new = 2,000,000, old = 1,000,000, Tdiff = 2 second. |
| 287 | * rate should be 1000/second |
| 288 | */ |
| 289 | newCnt = (SimpleCounter)SimpleCounter.createCounter( |
| 290 | currentTime, type); |
| 291 | newCnt.increment(currentTime, 2000000); |
| 292 | |
| 293 | lastCounterTime = new Date(currentTime.getTime() - 2000); |
| 294 | flowReconcileMgr.lastPacketInCounter = |
| 295 | (SimpleCounter)SimpleCounter.createCounter( |
| 296 | lastCounterTime, type); |
| 297 | flowReconcileMgr.lastPacketInCounter.increment(lastCounterTime, |
| 298 | 1000000); |
| 299 | assertEquals(500000, flowReconcileMgr.getPktInRate(newCnt, |
| 300 | currentTime)); |
| 301 | } |
| 302 | |
| 303 | @Test |
| 304 | public void testGetCurrentCapacity() throws Exception { |
| 305 | // Disable the reconcile thread. |
| 306 | flowReconcileMgr.flowReconcileEnabled = false; |
| 307 | |
| 308 | int minFlows = FlowReconcileManager.MIN_FLOW_RECONCILE_PER_SECOND * |
| 309 | FlowReconcileManager.FLOW_RECONCILE_DELAY_MILLISEC / 1000; |
| 310 | |
| 311 | /** Verify the initial state, when packetIn counter has not |
| 312 | * been created. |
| 313 | */ |
| 314 | expect(counterStore.getCounter( |
| 315 | flowReconcileMgr.controllerPktInCounterName)) |
| 316 | .andReturn(null) |
| 317 | .times(1); |
| 318 | |
| 319 | replay(counterStore); |
| 320 | assertEquals(minFlows, flowReconcileMgr.getCurrentCapacity()); |
| 321 | verify(counterStore); |
| 322 | |
| 323 | /** Verify the initial state, when lastPacketInCounter is null */ |
| 324 | reset(counterStore); |
| 325 | Date currentTime = new Date(); |
| 326 | SimpleCounter newCnt = (SimpleCounter)SimpleCounter.createCounter( |
| 327 | currentTime, CounterType.LONG); |
| 328 | |
| 329 | expect(counterStore.getCounter( |
| 330 | flowReconcileMgr.controllerPktInCounterName)) |
| 331 | .andReturn(newCnt) |
| 332 | .times(1); |
| 333 | long initPktInCount = 10000; |
| 334 | newCnt.increment(currentTime, initPktInCount); |
| 335 | |
| 336 | replay(counterStore); |
| 337 | assertEquals(minFlows, flowReconcileMgr.getCurrentCapacity()); |
| 338 | verify(counterStore); |
| 339 | |
| 340 | /** Now the lastPacketInCounter has been set. |
| 341 | * lastCounter = 100,000 and newCounter = 300,000, t = 1 second |
| 342 | * packetInRate = 200,000/sec. |
| 343 | * capacity should be 500k - 200k = 300k |
| 344 | */ |
| 345 | reset(counterStore); |
| 346 | newCnt = (SimpleCounter)SimpleCounter.createCounter( |
| 347 | currentTime, CounterType.LONG); |
| 348 | currentTime = new Date(currentTime.getTime() + 200); |
| 349 | long nextPktInCount = 30000; |
| 350 | newCnt.increment(currentTime, nextPktInCount); |
| 351 | |
| 352 | expect(counterStore.getCounter( |
| 353 | flowReconcileMgr.controllerPktInCounterName)) |
| 354 | .andReturn(newCnt) |
| 355 | .times(1); |
| 356 | |
| 357 | replay(counterStore); |
| 358 | // Wait for 1 second so that enough elapsed time to compute capacity. |
| 359 | Thread.sleep(1000); |
| 360 | int capacity = flowReconcileMgr.getCurrentCapacity(); |
| 361 | verify(counterStore); |
| 362 | long expectedCap = (FlowReconcileManager.MAX_SYSTEM_LOAD_PER_SECOND - |
| 363 | (nextPktInCount - initPktInCount)) * |
| 364 | FlowReconcileManager.FLOW_RECONCILE_DELAY_MILLISEC / 1000; |
| 365 | assertEquals(expectedCap, capacity); |
| 366 | } |
| 367 | |
| 368 | private class FlowReconcileWorker implements Runnable { |
| 369 | @Override |
| 370 | public void run() { |
| 371 | OFMatchReconcile ofmRc = new OFMatchReconcile(); |
| 372 | // push large number of flows to be reconciled. |
| 373 | for (int i = 0; i < NUM_FLOWS_PER_THREAD; i++) { |
| 374 | flowReconcileMgr.reconcileFlow(ofmRc); |
| 375 | } |
| 376 | } |
| 377 | } |
| 378 | |
| 379 | /** Verify the flows are sent to the reconcile pipeline in order. |
| 380 | */ |
| 381 | @SuppressWarnings("unchecked") |
| 382 | @Test |
| 383 | public void testQueueFlowsOrder() { |
| 384 | flowReconcileMgr.flowReconcileEnabled = false; |
| 385 | |
| 386 | IFlowReconcileListener r1 = |
| 387 | EasyMock.createNiceMock(IFlowReconcileListener.class); |
| 388 | |
| 389 | expect(r1.getName()).andReturn("r1").anyTimes(); |
| 390 | |
| 391 | // Set the listeners' order: r1 -> r2 -> r3 |
| 392 | expect(r1.isCallbackOrderingPrereq((OFType)anyObject(), |
| 393 | (String)anyObject())).andReturn(false).anyTimes(); |
| 394 | expect(r1.isCallbackOrderingPostreq((OFType)anyObject(), |
| 395 | (String)anyObject())).andReturn(false).anyTimes(); |
| 396 | |
| 397 | expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 398 | .andAnswer(new IAnswer<Command>() { |
| 399 | @Override |
| 400 | public Command answer() throws Throwable { |
| 401 | ArrayList<OFMatchReconcile> ofmList = |
| 402 | (ArrayList<OFMatchReconcile>)EasyMock. |
| 403 | getCurrentArguments()[0]; |
| 404 | ListIterator<OFMatchReconcile> lit = ofmList.listIterator(); |
| 405 | int index = 0; |
| 406 | while (lit.hasNext()) { |
| 407 | OFMatchReconcile ofm = lit.next(); |
| 408 | assertEquals(index++, ofm.cookie); |
| 409 | } |
| 410 | return Command.STOP; |
| 411 | } |
| 412 | }).times(1); |
| 413 | |
| 414 | SimpleCounter cnt = (SimpleCounter)SimpleCounter.createCounter( |
| 415 | new Date(), |
| 416 | CounterType.LONG); |
| 417 | cnt.increment(); |
| 418 | expect(counterStore.getCounter( |
| 419 | flowReconcileMgr.controllerPktInCounterName)) |
| 420 | .andReturn(cnt) |
| 421 | .anyTimes(); |
| 422 | |
| 423 | replay(r1, counterStore); |
| 424 | flowReconcileMgr.clearFlowReconcileListeners(); |
| 425 | flowReconcileMgr.addFlowReconcileListener(r1); |
| 426 | |
| 427 | OFMatchReconcile ofmRcIn = new OFMatchReconcile(); |
| 428 | int index = 0; |
| 429 | for (index = 0; index < 10; index++) { |
| 430 | ofmRcIn.cookie = index; |
| 431 | flowReconcileMgr.reconcileFlow(ofmRcIn); |
| 432 | } |
| 433 | flowReconcileMgr.flowReconcileEnabled = true; |
| 434 | flowReconcileMgr.doReconcile(); |
| 435 | |
| 436 | verify(r1); |
| 437 | } |
| 438 | |
| 439 | @SuppressWarnings("unchecked") |
| 440 | @Test |
| 441 | public void testQueueFlowsByManyThreads() { |
| 442 | // Disable the reconcile thread so that the queue won't be emptied. |
| 443 | flowQueueTest(false); |
| 444 | |
| 445 | // Enable the reconcile thread. The queue should be empty. |
| 446 | Date currentTime = new Date(); |
| 447 | SimpleCounter newCnt = (SimpleCounter)SimpleCounter.createCounter( |
| 448 | currentTime, CounterType.LONG); |
| 449 | |
| 450 | expect(counterStore.getCounter( |
| 451 | flowReconcileMgr.controllerPktInCounterName)) |
| 452 | .andReturn(newCnt) |
| 453 | .anyTimes(); |
| 454 | long initPktInCount = 10000; |
| 455 | newCnt.increment(currentTime, initPktInCount); |
| 456 | |
| 457 | IFlowReconcileListener r1 = |
| 458 | EasyMock.createNiceMock(IFlowReconcileListener.class); |
| 459 | |
| 460 | expect(r1.getName()).andReturn("r1").anyTimes(); |
| 461 | |
| 462 | // Set the listeners' order: r1 -> r2 -> r3 |
| 463 | expect(r1.isCallbackOrderingPrereq((OFType)anyObject(), |
| 464 | (String)anyObject())).andReturn(false).anyTimes(); |
| 465 | expect(r1.isCallbackOrderingPostreq((OFType)anyObject(), |
| 466 | (String)anyObject())).andReturn(false).anyTimes(); |
| 467 | |
| 468 | expect(r1.reconcileFlows((ArrayList<OFMatchReconcile>)anyObject())) |
| 469 | .andReturn(Command.CONTINUE).anyTimes(); |
| 470 | |
| 471 | flowReconcileMgr.clearFlowReconcileListeners(); |
| 472 | replay(r1, counterStore); |
| 473 | flowQueueTest(true); |
| 474 | verify(r1, counterStore); |
| 475 | } |
| 476 | |
| 477 | protected void flowQueueTest(boolean enableReconcileThread) { |
| 478 | flowReconcileMgr.flowReconcileEnabled = enableReconcileThread; |
| 479 | |
| 480 | // Simulate flow |
| 481 | for (int i = 0; i < NUM_THREADS; i++) { |
| 482 | Runnable worker = this.new FlowReconcileWorker(); |
| 483 | Thread t = new Thread(worker); |
| 484 | t.start(); |
| 485 | } |
| 486 | |
| 487 | Date startTime = new Date(); |
| 488 | int totalFlows = NUM_THREADS * NUM_FLOWS_PER_THREAD; |
| 489 | if (enableReconcileThread) { |
| 490 | totalFlows = 0; |
| 491 | } |
| 492 | while (flowReconcileMgr.flowQueue.size() != totalFlows) { |
| 493 | Date currTime = new Date(); |
| 494 | assertTrue((currTime.getTime() - startTime.getTime()) < 2000); |
| 495 | } |
| 496 | |
| 497 | // Make sure all flows are in the queue. |
| 498 | assertEquals(totalFlows, flowReconcileMgr.flowQueue.size()); |
| 499 | } |
| 500 | } |