blob: be24d00b54183a9cd2ac8f73508636ce3cf4b786 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman2bf177c2017-06-29 01:49:08 -070018import java.io.File;
19import java.io.IOException;
20import java.nio.file.FileVisitResult;
21import java.nio.file.Files;
22import java.nio.file.Path;
23import java.nio.file.Paths;
24import java.nio.file.SimpleFileVisitor;
25import java.nio.file.attribute.BasicFileAttributes;
26import java.time.Instant;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.ArrayList;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import java.util.Collections;
29import java.util.HashSet;
30import java.util.LinkedList;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080031import java.util.List;
32import java.util.concurrent.CompletableFuture;
33import java.util.concurrent.CountDownLatch;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070034import java.util.concurrent.TimeUnit;
35import java.util.function.Consumer;
36import java.util.stream.Collectors;
37
38import com.google.common.collect.Lists;
39import io.atomix.protocols.raft.RaftClient;
40import io.atomix.protocols.raft.RaftError;
41import io.atomix.protocols.raft.RaftServer;
42import io.atomix.protocols.raft.ReadConsistency;
43import io.atomix.protocols.raft.cluster.MemberId;
44import io.atomix.protocols.raft.cluster.RaftMember;
45import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
46import io.atomix.protocols.raft.event.RaftEvent;
47import io.atomix.protocols.raft.event.impl.DefaultEventType;
48import io.atomix.protocols.raft.operation.OperationType;
49import io.atomix.protocols.raft.operation.RaftOperation;
50import io.atomix.protocols.raft.operation.impl.DefaultOperationId;
51import io.atomix.protocols.raft.protocol.AppendRequest;
52import io.atomix.protocols.raft.protocol.AppendResponse;
53import io.atomix.protocols.raft.protocol.CloseSessionRequest;
54import io.atomix.protocols.raft.protocol.CloseSessionResponse;
55import io.atomix.protocols.raft.protocol.CommandRequest;
56import io.atomix.protocols.raft.protocol.CommandResponse;
57import io.atomix.protocols.raft.protocol.ConfigureRequest;
58import io.atomix.protocols.raft.protocol.ConfigureResponse;
59import io.atomix.protocols.raft.protocol.InstallRequest;
60import io.atomix.protocols.raft.protocol.InstallResponse;
61import io.atomix.protocols.raft.protocol.JoinRequest;
62import io.atomix.protocols.raft.protocol.JoinResponse;
63import io.atomix.protocols.raft.protocol.KeepAliveRequest;
64import io.atomix.protocols.raft.protocol.KeepAliveResponse;
65import io.atomix.protocols.raft.protocol.LeaveRequest;
66import io.atomix.protocols.raft.protocol.LeaveResponse;
67import io.atomix.protocols.raft.protocol.MetadataRequest;
68import io.atomix.protocols.raft.protocol.MetadataResponse;
69import io.atomix.protocols.raft.protocol.OpenSessionRequest;
70import io.atomix.protocols.raft.protocol.OpenSessionResponse;
71import io.atomix.protocols.raft.protocol.PollRequest;
72import io.atomix.protocols.raft.protocol.PollResponse;
73import io.atomix.protocols.raft.protocol.PublishRequest;
74import io.atomix.protocols.raft.protocol.QueryRequest;
75import io.atomix.protocols.raft.protocol.QueryResponse;
76import io.atomix.protocols.raft.protocol.RaftResponse;
77import io.atomix.protocols.raft.protocol.ReconfigureRequest;
78import io.atomix.protocols.raft.protocol.ReconfigureResponse;
79import io.atomix.protocols.raft.protocol.ResetRequest;
80import io.atomix.protocols.raft.protocol.VoteRequest;
81import io.atomix.protocols.raft.protocol.VoteResponse;
82import io.atomix.protocols.raft.proxy.CommunicationStrategy;
83import io.atomix.protocols.raft.proxy.RaftProxy;
84import io.atomix.protocols.raft.service.RaftService;
85import io.atomix.protocols.raft.session.SessionId;
86import io.atomix.protocols.raft.storage.RaftStorage;
87import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
88import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
89import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
90import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
91import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
92import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
93import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
94import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
95import io.atomix.protocols.raft.storage.system.Configuration;
96import io.atomix.storage.StorageLevel;
97import org.junit.After;
98import org.junit.Before;
99import org.onlab.util.KryoNamespace;
100import org.onosproject.cluster.NodeId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700101import org.onosproject.store.primitives.impl.RaftClientCommunicator;
102import org.onosproject.store.primitives.impl.RaftServerCommunicator;
103import org.onosproject.store.service.Serializer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800104
105/**
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700106 * Base class for various Atomix tests.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700107 *
108 * @param <T> the Raft primitive type being tested
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700110public abstract class AtomixTestBase<T extends AbstractRaftPrimitive> {
111
112 private static final Serializer PROTOCOL_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
113 .register(OpenSessionRequest.class)
114 .register(OpenSessionResponse.class)
115 .register(CloseSessionRequest.class)
116 .register(CloseSessionResponse.class)
117 .register(KeepAliveRequest.class)
118 .register(KeepAliveResponse.class)
119 .register(QueryRequest.class)
120 .register(QueryResponse.class)
121 .register(CommandRequest.class)
122 .register(CommandResponse.class)
123 .register(MetadataRequest.class)
124 .register(MetadataResponse.class)
125 .register(JoinRequest.class)
126 .register(JoinResponse.class)
127 .register(LeaveRequest.class)
128 .register(LeaveResponse.class)
129 .register(ConfigureRequest.class)
130 .register(ConfigureResponse.class)
131 .register(ReconfigureRequest.class)
132 .register(ReconfigureResponse.class)
133 .register(InstallRequest.class)
134 .register(InstallResponse.class)
135 .register(PollRequest.class)
136 .register(PollResponse.class)
137 .register(VoteRequest.class)
138 .register(VoteResponse.class)
139 .register(AppendRequest.class)
140 .register(AppendResponse.class)
141 .register(PublishRequest.class)
142 .register(ResetRequest.class)
143 .register(RaftResponse.Status.class)
144 .register(RaftError.class)
145 .register(RaftError.Type.class)
146 .register(ReadConsistency.class)
147 .register(byte[].class)
148 .register(long[].class)
149 .register(CloseSessionEntry.class)
150 .register(CommandEntry.class)
151 .register(ConfigurationEntry.class)
152 .register(InitializeEntry.class)
153 .register(KeepAliveEntry.class)
154 .register(MetadataEntry.class)
155 .register(OpenSessionEntry.class)
156 .register(QueryEntry.class)
157 .register(RaftOperation.class)
158 .register(RaftEvent.class)
159 .register(DefaultEventType.class)
160 .register(DefaultOperationId.class)
161 .register(OperationType.class)
162 .register(ReadConsistency.class)
163 .register(ArrayList.class)
164 .register(LinkedList.class)
165 .register(Collections.emptyList().getClass())
166 .register(HashSet.class)
167 .register(DefaultRaftMember.class)
168 .register(MemberId.class)
169 .register(SessionId.class)
170 .register(RaftMember.Type.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700171 .register(Instant.class)
172 .register(Configuration.class)
173 .register(AtomixAtomicCounterMapOperations.class)
174 .register(AtomixConsistentMapEvents.class)
175 .register(AtomixConsistentMapOperations.class)
176 .register(AtomixConsistentSetMultimapOperations.class)
177 .register(AtomixConsistentSetMultimapEvents.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700178 .register(AtomixConsistentTreeMapOperations.class)
179 .register(AtomixCounterOperations.class)
180 .register(AtomixDocumentTreeEvents.class)
181 .register(AtomixDocumentTreeOperations.class)
182 .register(AtomixLeaderElectorEvents.class)
183 .register(AtomixLeaderElectorOperations.class)
184 .register(AtomixWorkQueueEvents.class)
185 .register(AtomixWorkQueueOperations.class)
186 .build());
187
188 private static final Serializer STORAGE_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
189 .register(CloseSessionEntry.class)
190 .register(CommandEntry.class)
191 .register(ConfigurationEntry.class)
192 .register(InitializeEntry.class)
193 .register(KeepAliveEntry.class)
194 .register(MetadataEntry.class)
195 .register(OpenSessionEntry.class)
196 .register(QueryEntry.class)
197 .register(RaftOperation.class)
198 .register(ReadConsistency.class)
199 .register(AtomixAtomicCounterMapOperations.class)
200 .register(AtomixConsistentMapOperations.class)
201 .register(AtomixConsistentSetMultimapOperations.class)
202 .register(AtomixConsistentTreeMapOperations.class)
203 .register(AtomixCounterOperations.class)
204 .register(AtomixDocumentTreeOperations.class)
205 .register(AtomixLeaderElectorOperations.class)
206 .register(AtomixWorkQueueOperations.class)
207 .register(ArrayList.class)
208 .register(HashSet.class)
209 .register(DefaultRaftMember.class)
210 .register(MemberId.class)
211 .register(RaftMember.Type.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700212 .register(Instant.class)
213 .register(Configuration.class)
214 .register(byte[].class)
215 .register(long[].class)
216 .build());
217
218 protected TestClusterCommunicationServiceFactory communicationServiceFactory;
219 protected List<RaftMember> members = Lists.newCopyOnWriteArrayList();
220 protected List<RaftClient> clients = Lists.newCopyOnWriteArrayList();
221 protected List<RaftServer> servers = Lists.newCopyOnWriteArrayList();
222 protected int nextId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800223
224 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700225 * Creates the primitive service.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800226 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700227 * @return the primitive service
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800228 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700229 protected abstract RaftService createService();
230
231 /**
232 * Creates a new primitive.
233 *
234 * @param name the primitive name
235 * @return the primitive instance
236 */
237 protected T newPrimitive(String name) {
238 RaftClient client = createClient();
239 RaftProxy proxy = client.newProxyBuilder()
240 .withName(name)
241 .withServiceType("test")
242 .withReadConsistency(readConsistency())
243 .withCommunicationStrategy(communicationStrategy())
244 .build()
245 .open()
246 .join();
247 return createPrimitive(proxy);
248 }
249
250 /**
251 * Creates a new primitive instance.
252 *
253 * @param proxy the primitive proxy
254 * @return the primitive instance
255 */
256 protected abstract T createPrimitive(RaftProxy proxy);
257
258 /**
259 * Returns the proxy read consistency.
260 *
261 * @return the primitive read consistency
262 */
263 protected ReadConsistency readConsistency() {
264 return ReadConsistency.LINEARIZABLE;
265 }
266
267 /**
268 * Returns the proxy communication strategy.
269 *
270 * @return the primitive communication strategy
271 */
272 protected CommunicationStrategy communicationStrategy() {
273 return CommunicationStrategy.LEADER;
274 }
275
276 @Before
277 public void prepare() {
278 members.clear();
279 clients.clear();
280 servers.clear();
281 communicationServiceFactory = new TestClusterCommunicationServiceFactory();
282 createServers(3);
283 }
284
285 @After
286 public void cleanup() {
287 shutdown();
288 }
289
290 /**
291 * Shuts down clients and servers.
292 */
293 private void shutdown() {
294 clients.forEach(c -> {
295 try {
296 c.close().get(10, TimeUnit.SECONDS);
297 } catch (Exception e) {
298 }
299 });
300
301 servers.forEach(s -> {
302 try {
303 if (s.isRunning()) {
304 s.shutdown().get(10, TimeUnit.SECONDS);
305 }
306 } catch (Exception e) {
307 }
308 });
309
310 Path directory = Paths.get("target/primitives/");
311 if (Files.exists(directory)) {
312 try {
313 Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
314 @Override
315 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
316 Files.delete(file);
317 return FileVisitResult.CONTINUE;
318 }
319
320 @Override
321 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
322 Files.delete(dir);
323 return FileVisitResult.CONTINUE;
324 }
325 });
326 } catch (IOException e) {
327 }
328 }
329 }
330
331 /**
332 * Returns the next unique member identifier.
333 *
334 * @return The next unique member identifier.
335 */
336 private MemberId nextMemberId() {
337 return MemberId.from(String.valueOf(++nextId));
338 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800339
340 /**
341 * Returns the next server address.
342 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700343 * @param type The startup member type.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800344 * @return The next server address.
345 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700346 private RaftMember nextMember(RaftMember.Type type) {
347 return new TestMember(nextMemberId(), type);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800348 }
349
350 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700351 * Creates a set of Raft servers.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800352 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700353 protected List<RaftServer> createServers(int nodes) {
354 List<RaftServer> servers = new ArrayList<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800355
356 for (int i = 0; i < nodes; i++) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700357 members.add(nextMember(RaftMember.Type.ACTIVE));
358 }
359
360 CountDownLatch latch = new CountDownLatch(nodes);
361 for (int i = 0; i < nodes; i++) {
362 RaftServer server = createServer(members.get(i));
363 server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList()))
364 .thenRun(latch::countDown);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800365 servers.add(server);
366 }
367
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700368 try {
369 latch.await(30000, TimeUnit.MILLISECONDS);
370 } catch (InterruptedException e) {
371 throw new RuntimeException(e);
372 }
373
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800374 return servers;
375 }
376
377 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700378 * Creates a Raft server.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800379 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700380 private RaftServer createServer(RaftMember member) {
381 RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
382 .withType(member.getType())
383 .withProtocol(new RaftServerCommunicator(
Jordan Halterman980a8c12017-09-22 18:01:19 -0700384 "partition-1",
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700385 PROTOCOL_SERIALIZER,
386 communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
387 .withStorage(RaftStorage.newBuilder()
388 .withStorageLevel(StorageLevel.MEMORY)
389 .withDirectory(new File(String.format("target/primitives/%s", member.memberId())))
390 .withSerializer(new AtomixSerializerAdapter(STORAGE_SERIALIZER))
391 .withMaxSegmentSize(1024 * 1024)
392 .build())
393 .addService("test", this::createService);
394
395 RaftServer server = builder.build();
396 servers.add(server);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800397 return server;
398 }
399
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700400 /**
401 * Creates a Raft client.
402 */
403 private RaftClient createClient() {
404 MemberId memberId = nextMemberId();
405 RaftClient client = RaftClient.newBuilder()
406 .withMemberId(memberId)
407 .withProtocol(new RaftClientCommunicator(
Jordan Halterman980a8c12017-09-22 18:01:19 -0700408 "partition-1",
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700409 PROTOCOL_SERIALIZER,
410 communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
411 .build();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800412
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700413 client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();
414 clients.add(client);
415 return client;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800416 }
417
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800418 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700419 * Test member.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800420 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700421 public static class TestMember implements RaftMember {
422 private final MemberId memberId;
423 private final Type type;
424
425 public TestMember(MemberId memberId, Type type) {
426 this.memberId = memberId;
427 this.type = type;
428 }
429
430 @Override
431 public MemberId memberId() {
432 return memberId;
433 }
434
435 @Override
436 public int hash() {
437 return memberId.hashCode();
438 }
439
440 @Override
441 public Type getType() {
442 return type;
443 }
444
445 @Override
446 public void addTypeChangeListener(Consumer<Type> listener) {
447
448 }
449
450 @Override
451 public void removeTypeChangeListener(Consumer<Type> listener) {
452
453 }
454
455 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700456 public Instant getLastUpdated() {
457 return Instant.now();
458 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700459 @Override
460 public CompletableFuture<Void> promote() {
461 return null;
462 }
463
464 @Override
465 public CompletableFuture<Void> promote(Type type) {
466 return null;
467 }
468
469 @Override
470 public CompletableFuture<Void> demote() {
471 return null;
472 }
473
474 @Override
475 public CompletableFuture<Void> demote(Type type) {
476 return null;
477 }
478
479 @Override
480 public CompletableFuture<Void> remove() {
481 return null;
482 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800483 }
484}