blob: 3675373c7f43fee6f3abeff8a0b56803bc3c21a0 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani819d61d2016-07-25 20:29:43 -070018import static org.junit.Assert.assertEquals;
19import static org.junit.Assert.assertNull;
20import static org.junit.Assert.assertTrue;
21import io.atomix.Atomix;
22import io.atomix.AtomixClient;
23import io.atomix.resource.ResourceType;
24
Madan Jampani35708a92016-07-06 10:48:19 -070025import java.time.Duration;
26import java.util.Arrays;
27import java.util.UUID;
28import java.util.concurrent.CountDownLatch;
29import java.util.concurrent.Executor;
30import java.util.concurrent.Executors;
31import java.util.concurrent.TimeUnit;
32
Madan Jampani35708a92016-07-06 10:48:19 -070033import org.junit.AfterClass;
34import org.junit.BeforeClass;
35import org.junit.Test;
36import org.onlab.util.Tools;
37import org.onosproject.store.service.Task;
38import org.onosproject.store.service.WorkQueueStats;
39
40import com.google.common.util.concurrent.Uninterruptibles;
41
Madan Jampani35708a92016-07-06 10:48:19 -070042/**
43 * Unit tests for {@link AtomixWorkQueue}.
44 */
45public class AtomixWorkQueueTest extends AtomixTestBase {
46
47 private static final Duration DEFAULT_PROCESSING_TIME = Duration.ofMillis(100);
48 private static final byte[] DEFAULT_PAYLOAD = "hello world".getBytes();
49
50 @BeforeClass
51 public static void preTestSetup() throws Throwable {
52 createCopycatServers(1);
53 }
54
55 @AfterClass
56 public static void postTestCleanup() throws Exception {
57 clearTests();
58 }
59
60 @Override
61 protected ResourceType resourceType() {
62 return new ResourceType(AtomixWorkQueue.class);
63 }
64
65 @Test
66 public void testAdd() throws Throwable {
67 String queueName = UUID.randomUUID().toString();
68 Atomix atomix1 = createAtomixClient();
69 AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
70 byte[] item = DEFAULT_PAYLOAD;
71 queue1.addOne(item).join();
72
73 Atomix atomix2 = createAtomixClient();
74 AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
75 byte[] task2 = DEFAULT_PAYLOAD;
76 queue2.addOne(task2).join();
77
78 WorkQueueStats stats = queue1.stats().join();
79 assertEquals(stats.totalPending(), 2);
80 assertEquals(stats.totalInProgress(), 0);
81 assertEquals(stats.totalCompleted(), 0);
82 }
83
84 @Test
85 public void testAddMultiple() throws Throwable {
86 String queueName = UUID.randomUUID().toString();
87 Atomix atomix1 = createAtomixClient();
88 AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
89 byte[] item1 = DEFAULT_PAYLOAD;
90 byte[] item2 = DEFAULT_PAYLOAD;
91 queue1.addMultiple(Arrays.asList(item1, item2)).join();
92
93 WorkQueueStats stats = queue1.stats().join();
94 assertEquals(stats.totalPending(), 2);
95 assertEquals(stats.totalInProgress(), 0);
96 assertEquals(stats.totalCompleted(), 0);
97 }
98
99 @Test
100 public void testTakeAndComplete() throws Throwable {
101 String queueName = UUID.randomUUID().toString();
102 Atomix atomix1 = createAtomixClient();
103 AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
104 byte[] item1 = DEFAULT_PAYLOAD;
105 queue1.addOne(item1).join();
106
107 Atomix atomix2 = createAtomixClient();
108 AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
109 Task<byte[]> removedTask = queue2.take().join();
110
111 WorkQueueStats stats = queue2.stats().join();
112 assertEquals(stats.totalPending(), 0);
113 assertEquals(stats.totalInProgress(), 1);
114 assertEquals(stats.totalCompleted(), 0);
115
116 assertTrue(Arrays.equals(removedTask.payload(), item1));
117 queue2.complete(Arrays.asList(removedTask.taskId())).join();
118
119 stats = queue1.stats().join();
120 assertEquals(stats.totalPending(), 0);
121 assertEquals(stats.totalInProgress(), 0);
122 assertEquals(stats.totalCompleted(), 1);
123
124 // Another take should return null
125 assertNull(queue2.take().join());
126 }
127
128 @Test
129 public void testUnexpectedClientClose() throws Throwable {
130 String queueName = UUID.randomUUID().toString();
131 Atomix atomix1 = createAtomixClient();
132 AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
133 byte[] item1 = DEFAULT_PAYLOAD;
134 queue1.addOne(item1).join();
135
136 AtomixClient atomix2 = createAtomixClient();
137 AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
138 queue2.take().join();
139
140 WorkQueueStats stats = queue1.stats().join();
141 assertEquals(0, stats.totalPending());
142 assertEquals(1, stats.totalInProgress());
143 assertEquals(0, stats.totalCompleted());
144
145 atomix2.close().join();
146
147 stats = queue1.stats().join();
148 assertEquals(1, stats.totalPending());
149 assertEquals(0, stats.totalInProgress());
150 assertEquals(0, stats.totalCompleted());
151 }
152
153 @Test
154 public void testAutomaticTaskProcessing() throws Throwable {
155 String queueName = UUID.randomUUID().toString();
156 Atomix atomix1 = createAtomixClient();
157 AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
158 Executor executor = Executors.newSingleThreadExecutor();
159
160 CountDownLatch latch1 = new CountDownLatch(1);
161 queue1.registerTaskProcessor(s -> latch1.countDown(), 2, executor);
162
163 AtomixClient atomix2 = createAtomixClient();
164 AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
165 byte[] item1 = DEFAULT_PAYLOAD;
166 queue2.addOne(item1).join();
167
Aaron Kruglikov4fc7c262017-01-26 11:55:21 -0800168 assertTrue(Uninterruptibles.awaitUninterruptibly(latch1, 5000, TimeUnit.MILLISECONDS));
Madan Jampani35708a92016-07-06 10:48:19 -0700169 queue1.stopProcessing();
170
171 byte[] item2 = DEFAULT_PAYLOAD;
172 byte[] item3 = DEFAULT_PAYLOAD;
173
174 Tools.delay((int) DEFAULT_PROCESSING_TIME.toMillis());
175
176 queue2.addMultiple(Arrays.asList(item2, item3)).join();
177
178 WorkQueueStats stats = queue1.stats().join();
179 assertEquals(2, stats.totalPending());
180 assertEquals(0, stats.totalInProgress());
181 assertEquals(1, stats.totalCompleted());
182
183 CountDownLatch latch2 = new CountDownLatch(2);
184 queue1.registerTaskProcessor(s -> latch2.countDown(), 2, executor);
185
186 Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
187 }
Madan Jampani819d61d2016-07-25 20:29:43 -0700188
189 @Test
190 public void testDestroy() {
191 String queueName = UUID.randomUUID().toString();
192 Atomix atomix1 = createAtomixClient();
193 AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
194 byte[] item = DEFAULT_PAYLOAD;
195 queue1.addOne(item).join();
196
197 Atomix atomix2 = createAtomixClient();
198 AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
199 byte[] task2 = DEFAULT_PAYLOAD;
200 queue2.addOne(task2).join();
201
202 WorkQueueStats stats = queue1.stats().join();
203 assertEquals(stats.totalPending(), 2);
204 assertEquals(stats.totalInProgress(), 0);
205 assertEquals(stats.totalCompleted(), 0);
206
207 queue2.destroy().join();
208
209 stats = queue1.stats().join();
210 assertEquals(stats.totalPending(), 0);
211 assertEquals(stats.totalInProgress(), 0);
212 assertEquals(stats.totalCompleted(), 0);
213 }
214
215 @Test
216 public void testCompleteAttemptWithIncorrectSession() {
217 String queueName = UUID.randomUUID().toString();
218 Atomix atomix1 = createAtomixClient();
219 AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
220 byte[] item = DEFAULT_PAYLOAD;
221 queue1.addOne(item).join();
222
223 Task<byte[]> task = queue1.take().join();
224 String taskId = task.taskId();
225
226 // Create another client and get a handle to the same queue.
227 Atomix atomix2 = createAtomixClient();
228 AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
229
230 // Attempt completing the task with new client and verify task is not completed
231 queue2.complete(taskId).join();
232
233 WorkQueueStats stats = queue1.stats().join();
234 assertEquals(stats.totalPending(), 0);
235 assertEquals(stats.totalInProgress(), 1);
236 assertEquals(stats.totalCompleted(), 0);
237 }
Madan Jampani35708a92016-07-06 10:48:19 -0700238}