blob: 1e4259579ebcf913d07cc530a2bd2a4347f071f2 [file] [log] [blame]
Brian O'Connorc6713a82015-02-24 11:55:48 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Brian O'Connorc6713a82015-02-24 11:55:48 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
18import com.google.common.collect.Lists;
Yuta HIGUCHI2ca15392017-01-25 16:24:22 -080019
20import org.junit.Ignore;
Brian O'Connorc6713a82015-02-24 11:55:48 -080021import org.junit.Test;
22
23import java.util.List;
24import java.util.concurrent.CountDownLatch;
25import java.util.concurrent.ExecutionException;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.Future;
29import java.util.concurrent.TimeUnit;
30import java.util.concurrent.atomic.AtomicBoolean;
31
32import static org.junit.Assert.*;
33import static org.onlab.util.BoundedThreadPool.*;
34import static org.onlab.util.Tools.namedThreads;
35
36/**
37 * Test of BoundedThreadPool.
38 */
39public final class BoundedThreadPoolTest {
40
41 @Test
42 public void simpleJob() {
43 final Thread myThread = Thread.currentThread();
44 final AtomicBoolean sameThread = new AtomicBoolean(true);
45 final CountDownLatch latch = new CountDownLatch(1);
46
47 BoundedThreadPool exec = newSingleThreadExecutor(namedThreads("test"));
48 exec.submit(() -> {
49 sameThread.set(myThread.equals(Thread.currentThread()));
50 latch.countDown();
51 });
52
53 try {
54 assertTrue("Job not run", latch.await(100, TimeUnit.MILLISECONDS));
55 assertFalse("Runnable used caller thread", sameThread.get());
56 } catch (InterruptedException e) {
57 fail();
58 } finally {
59 exec.shutdown();
60 }
61
62 // TODO perhaps move to tearDown
63 try {
64 assertTrue(exec.awaitTermination(1, TimeUnit.SECONDS));
65 } catch (InterruptedException e) {
66 fail();
67 }
68 }
69
70 private List<CountDownLatch> fillExecutor(BoundedThreadPool exec) {
71 int numThreads = exec.getMaximumPoolSize();
72 List<CountDownLatch> latches = Lists.newArrayList();
73 final CountDownLatch started = new CountDownLatch(numThreads);
74 List<CountDownLatch> finished = Lists.newArrayList();
75
76 // seed the executor's threads
77 for (int i = 0; i < numThreads; i++) {
78 final CountDownLatch latch = new CountDownLatch(1);
79 final CountDownLatch fin = new CountDownLatch(1);
80 latches.add(latch);
81 finished.add(fin);
82 exec.submit(() -> {
83 try {
84 started.countDown();
85 latch.await();
86 fin.countDown();
87 } catch (InterruptedException e) {
88 fail();
89 }
90 });
91 }
92 try {
93 assertTrue(started.await(100, TimeUnit.MILLISECONDS));
94 } catch (InterruptedException e) {
95 fail();
96 }
97 // fill the queue
98 CountDownLatch startedBlocked = new CountDownLatch(1);
99 while (exec.getQueue().remainingCapacity() > 0) {
100 final CountDownLatch latch = new CountDownLatch(1);
101 latches.add(latch);
102 exec.submit(() -> {
103 try {
104 startedBlocked.countDown();
105 latch.await();
106 } catch (InterruptedException e) {
107 fail();
108 }
109 });
110 }
111
112 latches.remove(0).countDown(); // release one of the executors
113 // ... we need to do this because load is recomputed when jobs are taken
114 // Note: For this to work, 1 / numThreads must be less than the load threshold (0.2)
115
116 // verify that the old job has terminated
117 try {
118 assertTrue("Job didn't finish",
119 finished.remove(0).await(100, TimeUnit.MILLISECONDS));
120 } catch (InterruptedException e) {
121 fail();
122 }
123
124 // verify that a previously blocked thread has started
125 try {
126 assertTrue(startedBlocked.await(10, TimeUnit.MILLISECONDS));
127 } catch (InterruptedException e) {
128 fail();
129 }
130
131
132 // add another job to fill the queue
133 final CountDownLatch latch = new CountDownLatch(1);
134 latches.add(latch);
135 exec.submit(() -> {
136 try {
137 latch.await();
138 } catch (InterruptedException e) {
139 fail();
140 }
141 });
142 assertEquals(exec.getQueue().size(), maxQueueSize);
143
144 return latches;
145 }
146
Yuta HIGUCHI2ca15392017-01-25 16:24:22 -0800147 @Ignore("Disabled due to ONOS-5900")
Brian O'Connorc6713a82015-02-24 11:55:48 -0800148 @Test
149 public void releaseOneThread() {
150 maxQueueSize = 10;
151 BoundedThreadPool exec = newFixedThreadPool(4, namedThreads("test"));
152 List<CountDownLatch> latches = fillExecutor(exec);
153
154 CountDownLatch myLatch = new CountDownLatch(1);
155 ExecutorService myExec = Executors.newSingleThreadExecutor();
156 Future<Thread> expected = myExec.submit(Thread::currentThread);
157
158 assertEquals(exec.getQueue().size(), maxQueueSize);
159 long start = System.nanoTime();
160 Future<Thread> actual = myExec.submit(() -> {
161 return exec.submit(() -> {
162 myLatch.countDown();
163 return Thread.currentThread();
164 }).get();
165 });
166
167 try {
168 assertFalse("Thread should still be blocked",
169 myLatch.await(10, TimeUnit.MILLISECONDS));
170
171 latches.remove(0).countDown(); // release the first thread
172 assertFalse("Thread should still be blocked",
173 myLatch.await(10, TimeUnit.MILLISECONDS));
174 latches.remove(0).countDown(); // release the second thread
175
176 assertTrue("Thread should be unblocked",
HIGUCHI Yuta107ec542016-05-21 12:18:28 -0700177 myLatch.await(10, TimeUnit.SECONDS));
Brian O'Connorc6713a82015-02-24 11:55:48 -0800178 long delta = System.nanoTime() - start;
179 double load = exec.getQueue().size() / (double) maxQueueSize;
180 assertTrue("Load is greater than threshold", load <= 0.8);
181 assertTrue("Load is less than threshold", load >= 0.6);
182 assertEquals("Work done on wrong thread", expected.get(), actual.get());
183 assertTrue("Took more than one second", delta < Math.pow(10, 9));
184 } catch (InterruptedException | ExecutionException e) {
185 fail();
186 } finally {
187 latches.forEach(CountDownLatch::countDown);
188 exec.shutdown();
189 }
190
191 // TODO perhaps move to tearDown
192 try {
193 assertTrue(exec.awaitTermination(1, TimeUnit.SECONDS));
194 } catch (InterruptedException e) {
195 fail();
196 }
197
198 }
199
200 @Test
201 public void highLoadTimeout() {
202 maxQueueSize = 10;
203 BoundedThreadPool exec = newFixedThreadPool(2, namedThreads("test"));
204 List<CountDownLatch> latches = fillExecutor(exec);
205
206 // true if the job is executed and it is done on the test thread
207 final AtomicBoolean sameThread = new AtomicBoolean(false);
208 final Thread myThread = Thread.currentThread();
209 long start = System.nanoTime();
210 exec.submit(() -> {
211 sameThread.set(myThread.equals(Thread.currentThread()));
212 });
213
214 long delta = System.nanoTime() - start;
215 assertEquals(maxQueueSize, exec.getQueue().size());
216 assertTrue("Work done on wrong thread (or didn't happen)", sameThread.get());
217 assertTrue("Took less than one second. Actual: " + delta / 1_000_000.0 + "ms",
218 delta > Math.pow(10, 9));
219 assertTrue("Took more than two seconds", delta < 2 * Math.pow(10, 9));
220 latches.forEach(CountDownLatch::countDown);
221 exec.shutdown();
222
223 // TODO perhaps move to tearDown
224 try {
225 assertTrue(exec.awaitTermination(1, TimeUnit.SECONDS));
226 } catch (InterruptedException e) {
227 fail();
228 }
229 }
230}