blob: 7073bb501ab7eb842ee380ed1cb9ee87f16e9aa0 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman2bf177c2017-06-29 01:49:08 -070018import java.io.File;
19import java.io.IOException;
20import java.nio.file.FileVisitResult;
21import java.nio.file.Files;
22import java.nio.file.Path;
23import java.nio.file.Paths;
24import java.nio.file.SimpleFileVisitor;
25import java.nio.file.attribute.BasicFileAttributes;
26import java.time.Instant;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.ArrayList;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import java.util.Collections;
29import java.util.HashSet;
30import java.util.LinkedList;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080031import java.util.List;
32import java.util.concurrent.CompletableFuture;
33import java.util.concurrent.CountDownLatch;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070034import java.util.concurrent.TimeUnit;
35import java.util.function.Consumer;
36import java.util.stream.Collectors;
37
38import com.google.common.collect.Lists;
39import io.atomix.protocols.raft.RaftClient;
40import io.atomix.protocols.raft.RaftError;
41import io.atomix.protocols.raft.RaftServer;
42import io.atomix.protocols.raft.ReadConsistency;
43import io.atomix.protocols.raft.cluster.MemberId;
44import io.atomix.protocols.raft.cluster.RaftMember;
45import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
46import io.atomix.protocols.raft.event.RaftEvent;
47import io.atomix.protocols.raft.event.impl.DefaultEventType;
48import io.atomix.protocols.raft.operation.OperationType;
49import io.atomix.protocols.raft.operation.RaftOperation;
50import io.atomix.protocols.raft.operation.impl.DefaultOperationId;
51import io.atomix.protocols.raft.protocol.AppendRequest;
52import io.atomix.protocols.raft.protocol.AppendResponse;
53import io.atomix.protocols.raft.protocol.CloseSessionRequest;
54import io.atomix.protocols.raft.protocol.CloseSessionResponse;
55import io.atomix.protocols.raft.protocol.CommandRequest;
56import io.atomix.protocols.raft.protocol.CommandResponse;
57import io.atomix.protocols.raft.protocol.ConfigureRequest;
58import io.atomix.protocols.raft.protocol.ConfigureResponse;
59import io.atomix.protocols.raft.protocol.InstallRequest;
60import io.atomix.protocols.raft.protocol.InstallResponse;
61import io.atomix.protocols.raft.protocol.JoinRequest;
62import io.atomix.protocols.raft.protocol.JoinResponse;
63import io.atomix.protocols.raft.protocol.KeepAliveRequest;
64import io.atomix.protocols.raft.protocol.KeepAliveResponse;
65import io.atomix.protocols.raft.protocol.LeaveRequest;
66import io.atomix.protocols.raft.protocol.LeaveResponse;
67import io.atomix.protocols.raft.protocol.MetadataRequest;
68import io.atomix.protocols.raft.protocol.MetadataResponse;
69import io.atomix.protocols.raft.protocol.OpenSessionRequest;
70import io.atomix.protocols.raft.protocol.OpenSessionResponse;
71import io.atomix.protocols.raft.protocol.PollRequest;
72import io.atomix.protocols.raft.protocol.PollResponse;
73import io.atomix.protocols.raft.protocol.PublishRequest;
74import io.atomix.protocols.raft.protocol.QueryRequest;
75import io.atomix.protocols.raft.protocol.QueryResponse;
76import io.atomix.protocols.raft.protocol.RaftResponse;
77import io.atomix.protocols.raft.protocol.ReconfigureRequest;
78import io.atomix.protocols.raft.protocol.ReconfigureResponse;
79import io.atomix.protocols.raft.protocol.ResetRequest;
80import io.atomix.protocols.raft.protocol.VoteRequest;
81import io.atomix.protocols.raft.protocol.VoteResponse;
82import io.atomix.protocols.raft.proxy.CommunicationStrategy;
83import io.atomix.protocols.raft.proxy.RaftProxy;
84import io.atomix.protocols.raft.service.RaftService;
85import io.atomix.protocols.raft.session.SessionId;
86import io.atomix.protocols.raft.storage.RaftStorage;
87import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
88import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
89import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
90import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
91import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
92import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
93import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
94import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
95import io.atomix.protocols.raft.storage.system.Configuration;
96import io.atomix.storage.StorageLevel;
97import org.junit.After;
98import org.junit.Before;
99import org.onlab.util.KryoNamespace;
100import org.onosproject.cluster.NodeId;
101import org.onosproject.cluster.PartitionId;
102import org.onosproject.store.primitives.impl.RaftClientCommunicator;
103import org.onosproject.store.primitives.impl.RaftServerCommunicator;
104import org.onosproject.store.service.Serializer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800105
106/**
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700107 * Base class for various Atomix tests.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700108 *
109 * @param <T> the Raft primitive type being tested
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700111public abstract class AtomixTestBase<T extends AbstractRaftPrimitive> {
112
113 private static final Serializer PROTOCOL_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
114 .register(OpenSessionRequest.class)
115 .register(OpenSessionResponse.class)
116 .register(CloseSessionRequest.class)
117 .register(CloseSessionResponse.class)
118 .register(KeepAliveRequest.class)
119 .register(KeepAliveResponse.class)
120 .register(QueryRequest.class)
121 .register(QueryResponse.class)
122 .register(CommandRequest.class)
123 .register(CommandResponse.class)
124 .register(MetadataRequest.class)
125 .register(MetadataResponse.class)
126 .register(JoinRequest.class)
127 .register(JoinResponse.class)
128 .register(LeaveRequest.class)
129 .register(LeaveResponse.class)
130 .register(ConfigureRequest.class)
131 .register(ConfigureResponse.class)
132 .register(ReconfigureRequest.class)
133 .register(ReconfigureResponse.class)
134 .register(InstallRequest.class)
135 .register(InstallResponse.class)
136 .register(PollRequest.class)
137 .register(PollResponse.class)
138 .register(VoteRequest.class)
139 .register(VoteResponse.class)
140 .register(AppendRequest.class)
141 .register(AppendResponse.class)
142 .register(PublishRequest.class)
143 .register(ResetRequest.class)
144 .register(RaftResponse.Status.class)
145 .register(RaftError.class)
146 .register(RaftError.Type.class)
147 .register(ReadConsistency.class)
148 .register(byte[].class)
149 .register(long[].class)
150 .register(CloseSessionEntry.class)
151 .register(CommandEntry.class)
152 .register(ConfigurationEntry.class)
153 .register(InitializeEntry.class)
154 .register(KeepAliveEntry.class)
155 .register(MetadataEntry.class)
156 .register(OpenSessionEntry.class)
157 .register(QueryEntry.class)
158 .register(RaftOperation.class)
159 .register(RaftEvent.class)
160 .register(DefaultEventType.class)
161 .register(DefaultOperationId.class)
162 .register(OperationType.class)
163 .register(ReadConsistency.class)
164 .register(ArrayList.class)
165 .register(LinkedList.class)
166 .register(Collections.emptyList().getClass())
167 .register(HashSet.class)
168 .register(DefaultRaftMember.class)
169 .register(MemberId.class)
170 .register(SessionId.class)
171 .register(RaftMember.Type.class)
172 .register(RaftMember.Status.class)
173 .register(Instant.class)
174 .register(Configuration.class)
175 .register(AtomixAtomicCounterMapOperations.class)
176 .register(AtomixConsistentMapEvents.class)
177 .register(AtomixConsistentMapOperations.class)
178 .register(AtomixConsistentSetMultimapOperations.class)
179 .register(AtomixConsistentSetMultimapEvents.class)
180 .register(AtomixConsistentTreeMapEvents.class)
181 .register(AtomixConsistentTreeMapOperations.class)
182 .register(AtomixCounterOperations.class)
183 .register(AtomixDocumentTreeEvents.class)
184 .register(AtomixDocumentTreeOperations.class)
185 .register(AtomixLeaderElectorEvents.class)
186 .register(AtomixLeaderElectorOperations.class)
187 .register(AtomixWorkQueueEvents.class)
188 .register(AtomixWorkQueueOperations.class)
189 .build());
190
191 private static final Serializer STORAGE_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
192 .register(CloseSessionEntry.class)
193 .register(CommandEntry.class)
194 .register(ConfigurationEntry.class)
195 .register(InitializeEntry.class)
196 .register(KeepAliveEntry.class)
197 .register(MetadataEntry.class)
198 .register(OpenSessionEntry.class)
199 .register(QueryEntry.class)
200 .register(RaftOperation.class)
201 .register(ReadConsistency.class)
202 .register(AtomixAtomicCounterMapOperations.class)
203 .register(AtomixConsistentMapOperations.class)
204 .register(AtomixConsistentSetMultimapOperations.class)
205 .register(AtomixConsistentTreeMapOperations.class)
206 .register(AtomixCounterOperations.class)
207 .register(AtomixDocumentTreeOperations.class)
208 .register(AtomixLeaderElectorOperations.class)
209 .register(AtomixWorkQueueOperations.class)
210 .register(ArrayList.class)
211 .register(HashSet.class)
212 .register(DefaultRaftMember.class)
213 .register(MemberId.class)
214 .register(RaftMember.Type.class)
215 .register(RaftMember.Status.class)
216 .register(Instant.class)
217 .register(Configuration.class)
218 .register(byte[].class)
219 .register(long[].class)
220 .build());
221
222 protected TestClusterCommunicationServiceFactory communicationServiceFactory;
223 protected List<RaftMember> members = Lists.newCopyOnWriteArrayList();
224 protected List<RaftClient> clients = Lists.newCopyOnWriteArrayList();
225 protected List<RaftServer> servers = Lists.newCopyOnWriteArrayList();
226 protected int nextId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800227
228 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700229 * Creates the primitive service.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800230 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700231 * @return the primitive service
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800232 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700233 protected abstract RaftService createService();
234
235 /**
236 * Creates a new primitive.
237 *
238 * @param name the primitive name
239 * @return the primitive instance
240 */
241 protected T newPrimitive(String name) {
242 RaftClient client = createClient();
243 RaftProxy proxy = client.newProxyBuilder()
244 .withName(name)
245 .withServiceType("test")
246 .withReadConsistency(readConsistency())
247 .withCommunicationStrategy(communicationStrategy())
248 .build()
249 .open()
250 .join();
251 return createPrimitive(proxy);
252 }
253
254 /**
255 * Creates a new primitive instance.
256 *
257 * @param proxy the primitive proxy
258 * @return the primitive instance
259 */
260 protected abstract T createPrimitive(RaftProxy proxy);
261
262 /**
263 * Returns the proxy read consistency.
264 *
265 * @return the primitive read consistency
266 */
267 protected ReadConsistency readConsistency() {
268 return ReadConsistency.LINEARIZABLE;
269 }
270
271 /**
272 * Returns the proxy communication strategy.
273 *
274 * @return the primitive communication strategy
275 */
276 protected CommunicationStrategy communicationStrategy() {
277 return CommunicationStrategy.LEADER;
278 }
279
280 @Before
281 public void prepare() {
282 members.clear();
283 clients.clear();
284 servers.clear();
285 communicationServiceFactory = new TestClusterCommunicationServiceFactory();
286 createServers(3);
287 }
288
289 @After
290 public void cleanup() {
291 shutdown();
292 }
293
294 /**
295 * Shuts down clients and servers.
296 */
297 private void shutdown() {
298 clients.forEach(c -> {
299 try {
300 c.close().get(10, TimeUnit.SECONDS);
301 } catch (Exception e) {
302 }
303 });
304
305 servers.forEach(s -> {
306 try {
307 if (s.isRunning()) {
308 s.shutdown().get(10, TimeUnit.SECONDS);
309 }
310 } catch (Exception e) {
311 }
312 });
313
314 Path directory = Paths.get("target/primitives/");
315 if (Files.exists(directory)) {
316 try {
317 Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
318 @Override
319 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
320 Files.delete(file);
321 return FileVisitResult.CONTINUE;
322 }
323
324 @Override
325 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
326 Files.delete(dir);
327 return FileVisitResult.CONTINUE;
328 }
329 });
330 } catch (IOException e) {
331 }
332 }
333 }
334
335 /**
336 * Returns the next unique member identifier.
337 *
338 * @return The next unique member identifier.
339 */
340 private MemberId nextMemberId() {
341 return MemberId.from(String.valueOf(++nextId));
342 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800343
344 /**
345 * Returns the next server address.
346 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700347 * @param type The startup member type.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800348 * @return The next server address.
349 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700350 private RaftMember nextMember(RaftMember.Type type) {
351 return new TestMember(nextMemberId(), type);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800352 }
353
354 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700355 * Creates a set of Raft servers.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800356 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700357 protected List<RaftServer> createServers(int nodes) {
358 List<RaftServer> servers = new ArrayList<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800359
360 for (int i = 0; i < nodes; i++) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700361 members.add(nextMember(RaftMember.Type.ACTIVE));
362 }
363
364 CountDownLatch latch = new CountDownLatch(nodes);
365 for (int i = 0; i < nodes; i++) {
366 RaftServer server = createServer(members.get(i));
367 server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList()))
368 .thenRun(latch::countDown);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800369 servers.add(server);
370 }
371
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700372 try {
373 latch.await(30000, TimeUnit.MILLISECONDS);
374 } catch (InterruptedException e) {
375 throw new RuntimeException(e);
376 }
377
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800378 return servers;
379 }
380
381 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700382 * Creates a Raft server.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800383 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700384 private RaftServer createServer(RaftMember member) {
385 RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
386 .withType(member.getType())
387 .withProtocol(new RaftServerCommunicator(
388 PartitionId.from(1),
389 PROTOCOL_SERIALIZER,
390 communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
391 .withStorage(RaftStorage.newBuilder()
392 .withStorageLevel(StorageLevel.MEMORY)
393 .withDirectory(new File(String.format("target/primitives/%s", member.memberId())))
394 .withSerializer(new AtomixSerializerAdapter(STORAGE_SERIALIZER))
395 .withMaxSegmentSize(1024 * 1024)
396 .build())
397 .addService("test", this::createService);
398
399 RaftServer server = builder.build();
400 servers.add(server);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800401 return server;
402 }
403
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700404 /**
405 * Creates a Raft client.
406 */
407 private RaftClient createClient() {
408 MemberId memberId = nextMemberId();
409 RaftClient client = RaftClient.newBuilder()
410 .withMemberId(memberId)
411 .withProtocol(new RaftClientCommunicator(
412 PartitionId.from(1),
413 PROTOCOL_SERIALIZER,
414 communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
415 .build();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800416
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700417 client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();
418 clients.add(client);
419 return client;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800420 }
421
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800422 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700423 * Test member.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800424 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700425 public static class TestMember implements RaftMember {
426 private final MemberId memberId;
427 private final Type type;
428
429 public TestMember(MemberId memberId, Type type) {
430 this.memberId = memberId;
431 this.type = type;
432 }
433
434 @Override
435 public MemberId memberId() {
436 return memberId;
437 }
438
439 @Override
440 public int hash() {
441 return memberId.hashCode();
442 }
443
444 @Override
445 public Type getType() {
446 return type;
447 }
448
449 @Override
450 public void addTypeChangeListener(Consumer<Type> listener) {
451
452 }
453
454 @Override
455 public void removeTypeChangeListener(Consumer<Type> listener) {
456
457 }
458
459 @Override
460 public Status getStatus() {
461 return Status.AVAILABLE;
462 }
463
464 @Override
465 public Instant getLastUpdated() {
466 return Instant.now();
467 }
468
469 @Override
470 public void addStatusChangeListener(Consumer<Status> listener) {
471
472 }
473
474 @Override
475 public void removeStatusChangeListener(Consumer<Status> listener) {
476
477 }
478
479 @Override
480 public CompletableFuture<Void> promote() {
481 return null;
482 }
483
484 @Override
485 public CompletableFuture<Void> promote(Type type) {
486 return null;
487 }
488
489 @Override
490 public CompletableFuture<Void> demote() {
491 return null;
492 }
493
494 @Override
495 public CompletableFuture<Void> demote(Type type) {
496 return null;
497 }
498
499 @Override
500 public CompletableFuture<Void> remove() {
501 return null;
502 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800503 }
504}