blob: 64dbb403e1d4c0f56db9b60f2f245f8661ab3217 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.util.LinkedList;
19import java.util.Queue;
20import java.util.concurrent.CompletableFuture;
21import java.util.function.Consumer;
22
23import org.junit.Test;
24
25import static org.junit.Assert.*;
26
27import org.onosproject.cluster.Leadership;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.event.Change;
30
31import io.atomix.Atomix;
32import io.atomix.resource.ResourceType;
33
34/**
35 * Unit tests for {@link AtomixLeaderElector}.
36 */
Madan Jampani5e5b3d62016-02-01 16:03:33 -080037public class AtomixLeaderElectorTest extends AtomixTestBase {
38
39 NodeId node1 = new NodeId("node1");
40 NodeId node2 = new NodeId("node2");
41 NodeId node3 = new NodeId("node3");
42
43 @Override
44 protected ResourceType resourceType() {
45 return new ResourceType(AtomixLeaderElector.class);
46 }
47
48 @Test
49 public void testRun() throws Throwable {
50 leaderElectorRunTests(1);
51 clearTests();
Madan Jampani0c0cdc62016-02-22 16:54:06 -080052 leaderElectorRunTests(2);
53 clearTests();
54 leaderElectorRunTests(3);
55 clearTests();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080056 }
57
58 private void leaderElectorRunTests(int numServers) throws Throwable {
59 createCopycatServers(numServers);
60 Atomix client1 = createAtomixClient();
61 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
62 elector1.run("foo", node1).thenAccept(result -> {
63 assertEquals(node1, result.leaderNodeId());
64 assertEquals(1, result.leader().term());
65 assertEquals(1, result.candidates().size());
66 assertEquals(node1, result.candidates().get(0));
67 }).join();
68 Atomix client2 = createAtomixClient();
69 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
70 elector2.run("foo", node2).thenAccept(result -> {
71 assertEquals(node1, result.leaderNodeId());
72 assertEquals(1, result.leader().term());
73 assertEquals(2, result.candidates().size());
74 assertEquals(node1, result.candidates().get(0));
75 assertEquals(node2, result.candidates().get(1));
76 }).join();
77 }
78
79 @Test
80 public void testWithdraw() throws Throwable {
81 leaderElectorWithdrawTests(1);
82 clearTests();
83 leaderElectorWithdrawTests(2);
84 clearTests();
85 leaderElectorWithdrawTests(3);
86 clearTests();
87 }
88
89 private void leaderElectorWithdrawTests(int numServers) throws Throwable {
90 createCopycatServers(numServers);
91 Atomix client1 = createAtomixClient();
92 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
93 elector1.run("foo", node1).join();
94 Atomix client2 = createAtomixClient();
95 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
96 elector2.run("foo", node2).join();
97
98 LeaderEventListener listener1 = new LeaderEventListener();
99 elector1.addChangeListener(listener1).join();
100
101 LeaderEventListener listener2 = new LeaderEventListener();
102 elector2.addChangeListener(listener2).join();
103
104 elector1.withdraw("foo").join();
105
106 listener1.nextEvent().thenAccept(result -> {
107 assertEquals(node2, result.newValue().leaderNodeId());
108 assertEquals(2, result.newValue().leader().term());
109 assertEquals(1, result.newValue().candidates().size());
110 assertEquals(node2, result.newValue().candidates().get(0));
111 }).join();
112
113 listener2.nextEvent().thenAccept(result -> {
114 assertEquals(node2, result.newValue().leaderNodeId());
115 assertEquals(2, result.newValue().leader().term());
116 assertEquals(1, result.newValue().candidates().size());
117 assertEquals(node2, result.newValue().candidates().get(0));
118 }).join();
119 }
120
121 @Test
122 public void testAnoint() throws Throwable {
123 leaderElectorAnointTests(1);
124 clearTests();
125 leaderElectorAnointTests(2);
126 clearTests();
127 leaderElectorAnointTests(3);
128 clearTests();
129 }
130
131 private void leaderElectorAnointTests(int numServers) throws Throwable {
132 createCopycatServers(numServers);
133 Atomix client1 = createAtomixClient();
134 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
135 Atomix client2 = createAtomixClient();
136 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
137 Atomix client3 = createAtomixClient();
138 AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
139 elector1.run("foo", node1).join();
140 elector2.run("foo", node2).join();
141
142 LeaderEventListener listener1 = new LeaderEventListener();
143 elector1.addChangeListener(listener1).join();
144 LeaderEventListener listener2 = new LeaderEventListener();
145 elector2.addChangeListener(listener2);
146 LeaderEventListener listener3 = new LeaderEventListener();
147 elector3.addChangeListener(listener3).join();
148
149 elector3.anoint("foo", node3).thenAccept(result -> {
150 assertFalse(result);
151 }).join();
152 assertFalse(listener1.hasEvent());
153 assertFalse(listener2.hasEvent());
154 assertFalse(listener3.hasEvent());
155
156 elector3.anoint("foo", node2).thenAccept(result -> {
157 assertTrue(result);
158 }).join();
159 assertTrue(listener1.hasEvent());
160 assertTrue(listener2.hasEvent());
161 assertTrue(listener3.hasEvent());
162
163 listener1.nextEvent().thenAccept(result -> {
164 assertEquals(node2, result.newValue().leaderNodeId());
165 assertEquals(2, result.newValue().candidates().size());
166 assertEquals(node1, result.newValue().candidates().get(0));
167 assertEquals(node2, result.newValue().candidates().get(1));
168 }).join();
169 listener2.nextEvent().thenAccept(result -> {
170 assertEquals(node2, result.newValue().leaderNodeId());
171 assertEquals(2, result.newValue().candidates().size());
172 assertEquals(node1, result.newValue().candidates().get(0));
173 assertEquals(node2, result.newValue().candidates().get(1));
174 }).join();
175 listener3.nextEvent().thenAccept(result -> {
176 assertEquals(node2, result.newValue().leaderNodeId());
177 assertEquals(2, result.newValue().candidates().size());
178 assertEquals(node1, result.newValue().candidates().get(0));
179 assertEquals(node2, result.newValue().candidates().get(1));
180 }).join();
181 }
182
183 @Test
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800184 public void testPromote() throws Throwable {
185 leaderElectorPromoteTests(1);
186 clearTests();
187 leaderElectorPromoteTests(2);
188 clearTests();
189 leaderElectorPromoteTests(3);
190 clearTests();
191 }
192
193 private void leaderElectorPromoteTests(int numServers) throws Throwable {
194 createCopycatServers(numServers);
195 Atomix client1 = createAtomixClient();
196 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
197 Atomix client2 = createAtomixClient();
198 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
199 Atomix client3 = createAtomixClient();
200 AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
201 elector1.run("foo", node1).join();
202 elector2.run("foo", node2).join();
203
204 LeaderEventListener listener1 = new LeaderEventListener();
205 elector1.addChangeListener(listener1).join();
206 LeaderEventListener listener2 = new LeaderEventListener();
207 elector2.addChangeListener(listener2).join();
208 LeaderEventListener listener3 = new LeaderEventListener();
209 elector3.addChangeListener(listener3).join();
210
211 elector3.promote("foo", node3).thenAccept(result -> {
212 assertFalse(result);
213 }).join();
214
215 assertFalse(listener1.hasEvent());
216 assertFalse(listener2.hasEvent());
217 assertFalse(listener3.hasEvent());
218
219 elector3.run("foo", node3).join();
220
221 listener1.clearEvents();
222 listener2.clearEvents();
223 listener3.clearEvents();
224
225 elector3.promote("foo", node3).thenAccept(result -> {
226 assertTrue(result);
227 }).join();
228
229 listener1.nextEvent().thenAccept(result -> {
230 assertEquals(node3, result.newValue().candidates().get(0));
231 }).join();
232 listener2.nextEvent().thenAccept(result -> {
233 assertEquals(node3, result.newValue().candidates().get(0));
234 }).join();
235 listener3.nextEvent().thenAccept(result -> {
236 assertEquals(node3, result.newValue().candidates().get(0));
237 }).join();
238 }
239
240 @Test
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800241 public void testLeaderSessionClose() throws Throwable {
242 leaderElectorLeaderSessionCloseTests(1);
243 clearTests();
244 leaderElectorLeaderSessionCloseTests(2);
245 clearTests();
246 leaderElectorLeaderSessionCloseTests(3);
247 clearTests();
248 }
249
250 private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
251 createCopycatServers(numServers);
252 Atomix client1 = createAtomixClient();
253 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
254 elector1.run("foo", node1).join();
255 Atomix client2 = createAtomixClient();
256 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
257 LeaderEventListener listener = new LeaderEventListener();
258 elector2.run("foo", node2).join();
259 elector2.addChangeListener(listener).join();
260 client1.close();
261 listener.nextEvent().thenAccept(result -> {
262 assertEquals(node2, result.newValue().leaderNodeId());
263 assertEquals(1, result.newValue().candidates().size());
264 assertEquals(node2, result.newValue().candidates().get(0));
265 }).join();
266 }
267
268 @Test
269 public void testNonLeaderSessionClose() throws Throwable {
270 leaderElectorNonLeaderSessionCloseTests(1);
271 clearTests();
272 leaderElectorNonLeaderSessionCloseTests(2);
273 clearTests();
274 leaderElectorNonLeaderSessionCloseTests(3);
275 clearTests();
276 }
277
278 private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
279 createCopycatServers(numServers);
280 Atomix client1 = createAtomixClient();
281 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
282 elector1.run("foo", node1).join();
283 Atomix client2 = createAtomixClient();
284 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
285 LeaderEventListener listener = new LeaderEventListener();
286 elector2.run("foo", node2).join();
287 elector1.addChangeListener(listener).join();
288 client2.close().join();
289 listener.nextEvent().thenAccept(result -> {
290 assertEquals(node1, result.newValue().leaderNodeId());
291 assertEquals(1, result.newValue().candidates().size());
292 assertEquals(node1, result.newValue().candidates().get(0));
293 }).join();
294 }
295
296 @Test
297 public void testQueries() throws Throwable {
298 leaderElectorQueryTests(1);
299 clearTests();
300 leaderElectorQueryTests(2);
301 clearTests();
302 leaderElectorQueryTests(3);
303 clearTests();
304 }
305
306 private void leaderElectorQueryTests(int numServers) throws Throwable {
307 createCopycatServers(numServers);
308 Atomix client1 = createAtomixClient();
309 Atomix client2 = createAtomixClient();
310 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
311 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
312 elector1.run("foo", node1).join();
313 elector2.run("foo", node2).join();
314 elector2.run("bar", node2).join();
315 elector1.getElectedTopics(node1).thenAccept(result -> {
316 assertEquals(1, result.size());
317 assertTrue(result.contains("foo"));
318 }).join();
319 elector2.getElectedTopics(node1).thenAccept(result -> {
320 assertEquals(1, result.size());
321 assertTrue(result.contains("foo"));
322 }).join();
323 elector1.getLeadership("foo").thenAccept(result -> {
324 assertEquals(node1, result.leaderNodeId());
325 assertEquals(node1, result.candidates().get(0));
326 assertEquals(node2, result.candidates().get(1));
327 }).join();
328 elector2.getLeadership("foo").thenAccept(result -> {
329 assertEquals(node1, result.leaderNodeId());
330 assertEquals(node1, result.candidates().get(0));
331 assertEquals(node2, result.candidates().get(1));
332 }).join();
333 elector1.getLeadership("bar").thenAccept(result -> {
334 assertEquals(node2, result.leaderNodeId());
335 assertEquals(node2, result.candidates().get(0));
336 }).join();
337 elector2.getLeadership("bar").thenAccept(result -> {
338 assertEquals(node2, result.leaderNodeId());
339 assertEquals(node2, result.candidates().get(0));
340 }).join();
341 elector1.getLeaderships().thenAccept(result -> {
342 assertEquals(2, result.size());
343 Leadership fooLeadership = result.get("foo");
344 assertEquals(node1, fooLeadership.leaderNodeId());
345 assertEquals(node1, fooLeadership.candidates().get(0));
346 assertEquals(node2, fooLeadership.candidates().get(1));
347 Leadership barLeadership = result.get("bar");
348 assertEquals(node2, barLeadership.leaderNodeId());
349 assertEquals(node2, barLeadership.candidates().get(0));
350 }).join();
351 elector2.getLeaderships().thenAccept(result -> {
352 assertEquals(2, result.size());
353 Leadership fooLeadership = result.get("foo");
354 assertEquals(node1, fooLeadership.leaderNodeId());
355 assertEquals(node1, fooLeadership.candidates().get(0));
356 assertEquals(node2, fooLeadership.candidates().get(1));
357 Leadership barLeadership = result.get("bar");
358 assertEquals(node2, barLeadership.leaderNodeId());
359 assertEquals(node2, barLeadership.candidates().get(0));
360 }).join();
361 }
362
363 private static class LeaderEventListener implements Consumer<Change<Leadership>> {
364 Queue<Change<Leadership>> eventQueue = new LinkedList<>();
365 CompletableFuture<Change<Leadership>> pendingFuture;
366
367 @Override
368 public void accept(Change<Leadership> change) {
369 synchronized (this) {
370 if (pendingFuture != null) {
371 pendingFuture.complete(change);
372 pendingFuture = null;
373 } else {
374 eventQueue.add(change);
375 }
376 }
377 }
378
379 public boolean hasEvent() {
380 return !eventQueue.isEmpty();
381 }
382
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800383 public void clearEvents() {
384 eventQueue.clear();
385 }
386
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800387 public CompletableFuture<Change<Leadership>> nextEvent() {
388 synchronized (this) {
389 if (eventQueue.isEmpty()) {
390 if (pendingFuture == null) {
391 pendingFuture = new CompletableFuture<>();
392 }
393 return pendingFuture;
394 } else {
395 return CompletableFuture.completedFuture(eventQueue.poll());
396 }
397 }
398 }
399 }
400}