blob: 0553a1ebb58a3d682f4f48cf4ed96d1335a9ef64 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani35708a92016-07-06 10:48:19 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.time.Duration;
19import java.util.Arrays;
20import java.util.UUID;
21import java.util.concurrent.CountDownLatch;
22import java.util.concurrent.Executor;
23import java.util.concurrent.Executors;
24import java.util.concurrent.TimeUnit;
25
Jordan Halterman2bf177c2017-06-29 01:49:08 -070026import com.google.common.util.concurrent.Uninterruptibles;
27import io.atomix.protocols.raft.proxy.RaftProxy;
28import io.atomix.protocols.raft.service.RaftService;
Madan Jampani35708a92016-07-06 10:48:19 -070029import org.junit.Test;
30import org.onlab.util.Tools;
31import org.onosproject.store.service.Task;
32import org.onosproject.store.service.WorkQueueStats;
33
Jordan Halterman2bf177c2017-06-29 01:49:08 -070034import static org.junit.Assert.assertEquals;
35import static org.junit.Assert.assertNull;
36import static org.junit.Assert.assertTrue;
Madan Jampani35708a92016-07-06 10:48:19 -070037
Madan Jampani35708a92016-07-06 10:48:19 -070038/**
39 * Unit tests for {@link AtomixWorkQueue}.
40 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070041public class AtomixWorkQueueTest extends AtomixTestBase<AtomixWorkQueue> {
Madan Jampani35708a92016-07-06 10:48:19 -070042 private static final Duration DEFAULT_PROCESSING_TIME = Duration.ofMillis(100);
43 private static final byte[] DEFAULT_PAYLOAD = "hello world".getBytes();
44
Jordan Halterman2bf177c2017-06-29 01:49:08 -070045 @Override
46 protected RaftService createService() {
47 return new AtomixWorkQueueService();
Madan Jampani35708a92016-07-06 10:48:19 -070048 }
49
50 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -070051 protected AtomixWorkQueue createPrimitive(RaftProxy proxy) {
52 return new AtomixWorkQueue(proxy);
Madan Jampani35708a92016-07-06 10:48:19 -070053 }
54
55 @Test
56 public void testAdd() throws Throwable {
57 String queueName = UUID.randomUUID().toString();
Jordan Halterman2bf177c2017-06-29 01:49:08 -070058 AtomixWorkQueue queue1 = newPrimitive(queueName);
Madan Jampani35708a92016-07-06 10:48:19 -070059 byte[] item = DEFAULT_PAYLOAD;
60 queue1.addOne(item).join();
61
Jordan Halterman2bf177c2017-06-29 01:49:08 -070062 AtomixWorkQueue queue2 = newPrimitive(queueName);
Madan Jampani35708a92016-07-06 10:48:19 -070063 byte[] task2 = DEFAULT_PAYLOAD;
64 queue2.addOne(task2).join();
65
66 WorkQueueStats stats = queue1.stats().join();
67 assertEquals(stats.totalPending(), 2);
68 assertEquals(stats.totalInProgress(), 0);
69 assertEquals(stats.totalCompleted(), 0);
70 }
71
72 @Test
73 public void testAddMultiple() throws Throwable {
74 String queueName = UUID.randomUUID().toString();
Jordan Halterman2bf177c2017-06-29 01:49:08 -070075 AtomixWorkQueue queue1 = newPrimitive(queueName);
Madan Jampani35708a92016-07-06 10:48:19 -070076 byte[] item1 = DEFAULT_PAYLOAD;
77 byte[] item2 = DEFAULT_PAYLOAD;
78 queue1.addMultiple(Arrays.asList(item1, item2)).join();
79
80 WorkQueueStats stats = queue1.stats().join();
81 assertEquals(stats.totalPending(), 2);
82 assertEquals(stats.totalInProgress(), 0);
83 assertEquals(stats.totalCompleted(), 0);
84 }
85
86 @Test
87 public void testTakeAndComplete() throws Throwable {
88 String queueName = UUID.randomUUID().toString();
Jordan Halterman2bf177c2017-06-29 01:49:08 -070089 AtomixWorkQueue queue1 = newPrimitive(queueName);
Madan Jampani35708a92016-07-06 10:48:19 -070090 byte[] item1 = DEFAULT_PAYLOAD;
91 queue1.addOne(item1).join();
92
Jordan Halterman2bf177c2017-06-29 01:49:08 -070093 AtomixWorkQueue queue2 = newPrimitive(queueName);
Madan Jampani35708a92016-07-06 10:48:19 -070094 Task<byte[]> removedTask = queue2.take().join();
95
96 WorkQueueStats stats = queue2.stats().join();
97 assertEquals(stats.totalPending(), 0);
98 assertEquals(stats.totalInProgress(), 1);
99 assertEquals(stats.totalCompleted(), 0);
100
101 assertTrue(Arrays.equals(removedTask.payload(), item1));
102 queue2.complete(Arrays.asList(removedTask.taskId())).join();
103
104 stats = queue1.stats().join();
105 assertEquals(stats.totalPending(), 0);
106 assertEquals(stats.totalInProgress(), 0);
107 assertEquals(stats.totalCompleted(), 1);
108
109 // Another take should return null
110 assertNull(queue2.take().join());
111 }
112
113 @Test
114 public void testUnexpectedClientClose() throws Throwable {
115 String queueName = UUID.randomUUID().toString();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700116 AtomixWorkQueue queue1 = newPrimitive(queueName);
Madan Jampani35708a92016-07-06 10:48:19 -0700117 byte[] item1 = DEFAULT_PAYLOAD;
118 queue1.addOne(item1).join();
119
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700120 AtomixWorkQueue queue2 = newPrimitive(queueName);
Madan Jampani35708a92016-07-06 10:48:19 -0700121 queue2.take().join();
122
123 WorkQueueStats stats = queue1.stats().join();
124 assertEquals(0, stats.totalPending());
125 assertEquals(1, stats.totalInProgress());
126 assertEquals(0, stats.totalCompleted());
127
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700128 queue2.proxy.close().join();
Madan Jampani35708a92016-07-06 10:48:19 -0700129
130 stats = queue1.stats().join();
131 assertEquals(1, stats.totalPending());
132 assertEquals(0, stats.totalInProgress());
133 assertEquals(0, stats.totalCompleted());
134 }
135
136 @Test
137 public void testAutomaticTaskProcessing() throws Throwable {
138 String queueName = UUID.randomUUID().toString();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700139 AtomixWorkQueue queue1 = newPrimitive(queueName);
Madan Jampani35708a92016-07-06 10:48:19 -0700140 Executor executor = Executors.newSingleThreadExecutor();
141
142 CountDownLatch latch1 = new CountDownLatch(1);
143 queue1.registerTaskProcessor(s -> latch1.countDown(), 2, executor);
144
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700145 AtomixWorkQueue queue2 = newPrimitive(queueName);
Madan Jampani35708a92016-07-06 10:48:19 -0700146 byte[] item1 = DEFAULT_PAYLOAD;
147 queue2.addOne(item1).join();
148
Aaron Kruglikov4fc7c262017-01-26 11:55:21 -0800149 assertTrue(Uninterruptibles.awaitUninterruptibly(latch1, 5000, TimeUnit.MILLISECONDS));
Madan Jampani35708a92016-07-06 10:48:19 -0700150 queue1.stopProcessing();
151
152 byte[] item2 = DEFAULT_PAYLOAD;
153 byte[] item3 = DEFAULT_PAYLOAD;
154
155 Tools.delay((int) DEFAULT_PROCESSING_TIME.toMillis());
156
157 queue2.addMultiple(Arrays.asList(item2, item3)).join();
158
159 WorkQueueStats stats = queue1.stats().join();
160 assertEquals(2, stats.totalPending());
161 assertEquals(0, stats.totalInProgress());
162 assertEquals(1, stats.totalCompleted());
163
164 CountDownLatch latch2 = new CountDownLatch(2);
165 queue1.registerTaskProcessor(s -> latch2.countDown(), 2, executor);
166
167 Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
168 }
Madan Jampani819d61d2016-07-25 20:29:43 -0700169
170 @Test
171 public void testDestroy() {
172 String queueName = UUID.randomUUID().toString();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700173 AtomixWorkQueue queue1 = newPrimitive(queueName);
Madan Jampani819d61d2016-07-25 20:29:43 -0700174 byte[] item = DEFAULT_PAYLOAD;
175 queue1.addOne(item).join();
176
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700177 AtomixWorkQueue queue2 = newPrimitive(queueName);
Madan Jampani819d61d2016-07-25 20:29:43 -0700178 byte[] task2 = DEFAULT_PAYLOAD;
179 queue2.addOne(task2).join();
180
181 WorkQueueStats stats = queue1.stats().join();
182 assertEquals(stats.totalPending(), 2);
183 assertEquals(stats.totalInProgress(), 0);
184 assertEquals(stats.totalCompleted(), 0);
185
186 queue2.destroy().join();
187
188 stats = queue1.stats().join();
189 assertEquals(stats.totalPending(), 0);
190 assertEquals(stats.totalInProgress(), 0);
191 assertEquals(stats.totalCompleted(), 0);
192 }
193
194 @Test
195 public void testCompleteAttemptWithIncorrectSession() {
196 String queueName = UUID.randomUUID().toString();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700197 AtomixWorkQueue queue1 = newPrimitive(queueName);
Madan Jampani819d61d2016-07-25 20:29:43 -0700198 byte[] item = DEFAULT_PAYLOAD;
199 queue1.addOne(item).join();
200
201 Task<byte[]> task = queue1.take().join();
202 String taskId = task.taskId();
203
204 // Create another client and get a handle to the same queue.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700205 AtomixWorkQueue queue2 = newPrimitive(queueName);
Madan Jampani819d61d2016-07-25 20:29:43 -0700206
207 // Attempt completing the task with new client and verify task is not completed
208 queue2.complete(taskId).join();
209
210 WorkQueueStats stats = queue1.stats().join();
211 assertEquals(stats.totalPending(), 0);
212 assertEquals(stats.totalInProgress(), 1);
213 assertEquals(stats.totalCompleted(), 0);
214 }
Madan Jampani35708a92016-07-06 10:48:19 -0700215}