blob: 7d77d0358f50474c73c66f46323f94a5dc85100b [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.util.LinkedList;
19import java.util.Queue;
20import java.util.concurrent.CompletableFuture;
21import java.util.function.Consumer;
22
23import org.junit.Test;
24
25import static org.junit.Assert.*;
26
27import org.onosproject.cluster.Leadership;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.event.Change;
30
31import io.atomix.Atomix;
32import io.atomix.resource.ResourceType;
33
34/**
35 * Unit tests for {@link AtomixLeaderElector}.
36 */
37public class AtomixLeaderElectorTest extends AtomixTestBase {
38
39 NodeId node1 = new NodeId("node1");
40 NodeId node2 = new NodeId("node2");
41 NodeId node3 = new NodeId("node3");
42
43 @Override
44 protected ResourceType resourceType() {
45 return new ResourceType(AtomixLeaderElector.class);
46 }
47
48 @Test
49 public void testRun() throws Throwable {
50 leaderElectorRunTests(1);
51 clearTests();
52// leaderElectorRunTests(2);
53// clearTests();
54// leaderElectorRunTests(3);
55// clearTests();
56 }
57
58 private void leaderElectorRunTests(int numServers) throws Throwable {
59 createCopycatServers(numServers);
60 Atomix client1 = createAtomixClient();
61 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
62 elector1.run("foo", node1).thenAccept(result -> {
63 assertEquals(node1, result.leaderNodeId());
64 assertEquals(1, result.leader().term());
65 assertEquals(1, result.candidates().size());
66 assertEquals(node1, result.candidates().get(0));
67 }).join();
68 Atomix client2 = createAtomixClient();
69 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
70 elector2.run("foo", node2).thenAccept(result -> {
71 assertEquals(node1, result.leaderNodeId());
72 assertEquals(1, result.leader().term());
73 assertEquals(2, result.candidates().size());
74 assertEquals(node1, result.candidates().get(0));
75 assertEquals(node2, result.candidates().get(1));
76 }).join();
77 }
78
79 @Test
80 public void testWithdraw() throws Throwable {
81 leaderElectorWithdrawTests(1);
82 clearTests();
83 leaderElectorWithdrawTests(2);
84 clearTests();
85 leaderElectorWithdrawTests(3);
86 clearTests();
87 }
88
89 private void leaderElectorWithdrawTests(int numServers) throws Throwable {
90 createCopycatServers(numServers);
91 Atomix client1 = createAtomixClient();
92 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
93 elector1.run("foo", node1).join();
94 Atomix client2 = createAtomixClient();
95 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
96 elector2.run("foo", node2).join();
97
98 LeaderEventListener listener1 = new LeaderEventListener();
99 elector1.addChangeListener(listener1).join();
100
101 LeaderEventListener listener2 = new LeaderEventListener();
102 elector2.addChangeListener(listener2).join();
103
104 elector1.withdraw("foo").join();
105
106 listener1.nextEvent().thenAccept(result -> {
107 assertEquals(node2, result.newValue().leaderNodeId());
108 assertEquals(2, result.newValue().leader().term());
109 assertEquals(1, result.newValue().candidates().size());
110 assertEquals(node2, result.newValue().candidates().get(0));
111 }).join();
112
113 listener2.nextEvent().thenAccept(result -> {
114 assertEquals(node2, result.newValue().leaderNodeId());
115 assertEquals(2, result.newValue().leader().term());
116 assertEquals(1, result.newValue().candidates().size());
117 assertEquals(node2, result.newValue().candidates().get(0));
118 }).join();
119 }
120
121 @Test
122 public void testAnoint() throws Throwable {
123 leaderElectorAnointTests(1);
124 clearTests();
125 leaderElectorAnointTests(2);
126 clearTests();
127 leaderElectorAnointTests(3);
128 clearTests();
129 }
130
131 private void leaderElectorAnointTests(int numServers) throws Throwable {
132 createCopycatServers(numServers);
133 Atomix client1 = createAtomixClient();
134 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
135 Atomix client2 = createAtomixClient();
136 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
137 Atomix client3 = createAtomixClient();
138 AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
139 elector1.run("foo", node1).join();
140 elector2.run("foo", node2).join();
141
142 LeaderEventListener listener1 = new LeaderEventListener();
143 elector1.addChangeListener(listener1).join();
144 LeaderEventListener listener2 = new LeaderEventListener();
145 elector2.addChangeListener(listener2);
146 LeaderEventListener listener3 = new LeaderEventListener();
147 elector3.addChangeListener(listener3).join();
148
149 elector3.anoint("foo", node3).thenAccept(result -> {
150 assertFalse(result);
151 }).join();
152 assertFalse(listener1.hasEvent());
153 assertFalse(listener2.hasEvent());
154 assertFalse(listener3.hasEvent());
155
156 elector3.anoint("foo", node2).thenAccept(result -> {
157 assertTrue(result);
158 }).join();
159 assertTrue(listener1.hasEvent());
160 assertTrue(listener2.hasEvent());
161 assertTrue(listener3.hasEvent());
162
163 listener1.nextEvent().thenAccept(result -> {
164 assertEquals(node2, result.newValue().leaderNodeId());
165 assertEquals(2, result.newValue().candidates().size());
166 assertEquals(node1, result.newValue().candidates().get(0));
167 assertEquals(node2, result.newValue().candidates().get(1));
168 }).join();
169 listener2.nextEvent().thenAccept(result -> {
170 assertEquals(node2, result.newValue().leaderNodeId());
171 assertEquals(2, result.newValue().candidates().size());
172 assertEquals(node1, result.newValue().candidates().get(0));
173 assertEquals(node2, result.newValue().candidates().get(1));
174 }).join();
175 listener3.nextEvent().thenAccept(result -> {
176 assertEquals(node2, result.newValue().leaderNodeId());
177 assertEquals(2, result.newValue().candidates().size());
178 assertEquals(node1, result.newValue().candidates().get(0));
179 assertEquals(node2, result.newValue().candidates().get(1));
180 }).join();
181 }
182
183 @Test
184 public void testLeaderSessionClose() throws Throwable {
185 leaderElectorLeaderSessionCloseTests(1);
186 clearTests();
187 leaderElectorLeaderSessionCloseTests(2);
188 clearTests();
189 leaderElectorLeaderSessionCloseTests(3);
190 clearTests();
191 }
192
193 private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
194 createCopycatServers(numServers);
195 Atomix client1 = createAtomixClient();
196 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
197 elector1.run("foo", node1).join();
198 Atomix client2 = createAtomixClient();
199 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
200 LeaderEventListener listener = new LeaderEventListener();
201 elector2.run("foo", node2).join();
202 elector2.addChangeListener(listener).join();
203 client1.close();
204 listener.nextEvent().thenAccept(result -> {
205 assertEquals(node2, result.newValue().leaderNodeId());
206 assertEquals(1, result.newValue().candidates().size());
207 assertEquals(node2, result.newValue().candidates().get(0));
208 }).join();
209 }
210
211 @Test
212 public void testNonLeaderSessionClose() throws Throwable {
213 leaderElectorNonLeaderSessionCloseTests(1);
214 clearTests();
215 leaderElectorNonLeaderSessionCloseTests(2);
216 clearTests();
217 leaderElectorNonLeaderSessionCloseTests(3);
218 clearTests();
219 }
220
221 private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
222 createCopycatServers(numServers);
223 Atomix client1 = createAtomixClient();
224 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
225 elector1.run("foo", node1).join();
226 Atomix client2 = createAtomixClient();
227 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
228 LeaderEventListener listener = new LeaderEventListener();
229 elector2.run("foo", node2).join();
230 elector1.addChangeListener(listener).join();
231 client2.close().join();
232 listener.nextEvent().thenAccept(result -> {
233 assertEquals(node1, result.newValue().leaderNodeId());
234 assertEquals(1, result.newValue().candidates().size());
235 assertEquals(node1, result.newValue().candidates().get(0));
236 }).join();
237 }
238
239 @Test
240 public void testQueries() throws Throwable {
241 leaderElectorQueryTests(1);
242 clearTests();
243 leaderElectorQueryTests(2);
244 clearTests();
245 leaderElectorQueryTests(3);
246 clearTests();
247 }
248
249 private void leaderElectorQueryTests(int numServers) throws Throwable {
250 createCopycatServers(numServers);
251 Atomix client1 = createAtomixClient();
252 Atomix client2 = createAtomixClient();
253 AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
254 AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
255 elector1.run("foo", node1).join();
256 elector2.run("foo", node2).join();
257 elector2.run("bar", node2).join();
258 elector1.getElectedTopics(node1).thenAccept(result -> {
259 assertEquals(1, result.size());
260 assertTrue(result.contains("foo"));
261 }).join();
262 elector2.getElectedTopics(node1).thenAccept(result -> {
263 assertEquals(1, result.size());
264 assertTrue(result.contains("foo"));
265 }).join();
266 elector1.getLeadership("foo").thenAccept(result -> {
267 assertEquals(node1, result.leaderNodeId());
268 assertEquals(node1, result.candidates().get(0));
269 assertEquals(node2, result.candidates().get(1));
270 }).join();
271 elector2.getLeadership("foo").thenAccept(result -> {
272 assertEquals(node1, result.leaderNodeId());
273 assertEquals(node1, result.candidates().get(0));
274 assertEquals(node2, result.candidates().get(1));
275 }).join();
276 elector1.getLeadership("bar").thenAccept(result -> {
277 assertEquals(node2, result.leaderNodeId());
278 assertEquals(node2, result.candidates().get(0));
279 }).join();
280 elector2.getLeadership("bar").thenAccept(result -> {
281 assertEquals(node2, result.leaderNodeId());
282 assertEquals(node2, result.candidates().get(0));
283 }).join();
284 elector1.getLeaderships().thenAccept(result -> {
285 assertEquals(2, result.size());
286 Leadership fooLeadership = result.get("foo");
287 assertEquals(node1, fooLeadership.leaderNodeId());
288 assertEquals(node1, fooLeadership.candidates().get(0));
289 assertEquals(node2, fooLeadership.candidates().get(1));
290 Leadership barLeadership = result.get("bar");
291 assertEquals(node2, barLeadership.leaderNodeId());
292 assertEquals(node2, barLeadership.candidates().get(0));
293 }).join();
294 elector2.getLeaderships().thenAccept(result -> {
295 assertEquals(2, result.size());
296 Leadership fooLeadership = result.get("foo");
297 assertEquals(node1, fooLeadership.leaderNodeId());
298 assertEquals(node1, fooLeadership.candidates().get(0));
299 assertEquals(node2, fooLeadership.candidates().get(1));
300 Leadership barLeadership = result.get("bar");
301 assertEquals(node2, barLeadership.leaderNodeId());
302 assertEquals(node2, barLeadership.candidates().get(0));
303 }).join();
304 }
305
306 private static class LeaderEventListener implements Consumer<Change<Leadership>> {
307 Queue<Change<Leadership>> eventQueue = new LinkedList<>();
308 CompletableFuture<Change<Leadership>> pendingFuture;
309
310 @Override
311 public void accept(Change<Leadership> change) {
312 synchronized (this) {
313 if (pendingFuture != null) {
314 pendingFuture.complete(change);
315 pendingFuture = null;
316 } else {
317 eventQueue.add(change);
318 }
319 }
320 }
321
322 public boolean hasEvent() {
323 return !eventQueue.isEmpty();
324 }
325
326 public CompletableFuture<Change<Leadership>> nextEvent() {
327 synchronized (this) {
328 if (eventQueue.isEmpty()) {
329 if (pendingFuture == null) {
330 pendingFuture = new CompletableFuture<>();
331 }
332 return pendingFuture;
333 } else {
334 return CompletableFuture.completedFuture(eventQueue.poll());
335 }
336 }
337 }
338 }
339}