blob: 1ade93185c0257cddae30e92bbe4e8c8382519cc [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman2bf177c2017-06-29 01:49:08 -070018import java.io.File;
19import java.io.IOException;
20import java.nio.file.FileVisitResult;
21import java.nio.file.Files;
22import java.nio.file.Path;
23import java.nio.file.Paths;
24import java.nio.file.SimpleFileVisitor;
25import java.nio.file.attribute.BasicFileAttributes;
26import java.time.Instant;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.ArrayList;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import java.util.Collections;
29import java.util.HashSet;
30import java.util.LinkedList;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080031import java.util.List;
32import java.util.concurrent.CompletableFuture;
33import java.util.concurrent.CountDownLatch;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070034import java.util.concurrent.TimeUnit;
35import java.util.function.Consumer;
36import java.util.stream.Collectors;
37
38import com.google.common.collect.Lists;
39import io.atomix.protocols.raft.RaftClient;
40import io.atomix.protocols.raft.RaftError;
41import io.atomix.protocols.raft.RaftServer;
42import io.atomix.protocols.raft.ReadConsistency;
43import io.atomix.protocols.raft.cluster.MemberId;
44import io.atomix.protocols.raft.cluster.RaftMember;
45import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
46import io.atomix.protocols.raft.event.RaftEvent;
47import io.atomix.protocols.raft.event.impl.DefaultEventType;
48import io.atomix.protocols.raft.operation.OperationType;
49import io.atomix.protocols.raft.operation.RaftOperation;
50import io.atomix.protocols.raft.operation.impl.DefaultOperationId;
51import io.atomix.protocols.raft.protocol.AppendRequest;
52import io.atomix.protocols.raft.protocol.AppendResponse;
53import io.atomix.protocols.raft.protocol.CloseSessionRequest;
54import io.atomix.protocols.raft.protocol.CloseSessionResponse;
55import io.atomix.protocols.raft.protocol.CommandRequest;
56import io.atomix.protocols.raft.protocol.CommandResponse;
57import io.atomix.protocols.raft.protocol.ConfigureRequest;
58import io.atomix.protocols.raft.protocol.ConfigureResponse;
Jordan Halterman19486e32017-11-02 15:00:06 -070059import io.atomix.protocols.raft.protocol.HeartbeatRequest;
60import io.atomix.protocols.raft.protocol.HeartbeatResponse;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061import io.atomix.protocols.raft.protocol.InstallRequest;
62import io.atomix.protocols.raft.protocol.InstallResponse;
63import io.atomix.protocols.raft.protocol.JoinRequest;
64import io.atomix.protocols.raft.protocol.JoinResponse;
65import io.atomix.protocols.raft.protocol.KeepAliveRequest;
66import io.atomix.protocols.raft.protocol.KeepAliveResponse;
67import io.atomix.protocols.raft.protocol.LeaveRequest;
68import io.atomix.protocols.raft.protocol.LeaveResponse;
69import io.atomix.protocols.raft.protocol.MetadataRequest;
70import io.atomix.protocols.raft.protocol.MetadataResponse;
71import io.atomix.protocols.raft.protocol.OpenSessionRequest;
72import io.atomix.protocols.raft.protocol.OpenSessionResponse;
73import io.atomix.protocols.raft.protocol.PollRequest;
74import io.atomix.protocols.raft.protocol.PollResponse;
75import io.atomix.protocols.raft.protocol.PublishRequest;
76import io.atomix.protocols.raft.protocol.QueryRequest;
77import io.atomix.protocols.raft.protocol.QueryResponse;
78import io.atomix.protocols.raft.protocol.RaftResponse;
79import io.atomix.protocols.raft.protocol.ReconfigureRequest;
80import io.atomix.protocols.raft.protocol.ReconfigureResponse;
81import io.atomix.protocols.raft.protocol.ResetRequest;
82import io.atomix.protocols.raft.protocol.VoteRequest;
83import io.atomix.protocols.raft.protocol.VoteResponse;
84import io.atomix.protocols.raft.proxy.CommunicationStrategy;
85import io.atomix.protocols.raft.proxy.RaftProxy;
86import io.atomix.protocols.raft.service.RaftService;
87import io.atomix.protocols.raft.session.SessionId;
88import io.atomix.protocols.raft.storage.RaftStorage;
89import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
90import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
91import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
92import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
93import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
94import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
95import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
96import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
97import io.atomix.protocols.raft.storage.system.Configuration;
98import io.atomix.storage.StorageLevel;
99import org.junit.After;
100import org.junit.Before;
101import org.onlab.util.KryoNamespace;
102import org.onosproject.cluster.NodeId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700103import org.onosproject.store.primitives.impl.RaftClientCommunicator;
104import org.onosproject.store.primitives.impl.RaftServerCommunicator;
105import org.onosproject.store.service.Serializer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800106
107/**
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700108 * Base class for various Atomix tests.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700109 *
110 * @param <T> the Raft primitive type being tested
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800111 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700112public abstract class AtomixTestBase<T extends AbstractRaftPrimitive> {
113
114 private static final Serializer PROTOCOL_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
115 .register(OpenSessionRequest.class)
116 .register(OpenSessionResponse.class)
117 .register(CloseSessionRequest.class)
118 .register(CloseSessionResponse.class)
119 .register(KeepAliveRequest.class)
120 .register(KeepAliveResponse.class)
Jordan Halterman19486e32017-11-02 15:00:06 -0700121 .register(HeartbeatRequest.class)
122 .register(HeartbeatResponse.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700123 .register(QueryRequest.class)
124 .register(QueryResponse.class)
125 .register(CommandRequest.class)
126 .register(CommandResponse.class)
127 .register(MetadataRequest.class)
128 .register(MetadataResponse.class)
129 .register(JoinRequest.class)
130 .register(JoinResponse.class)
131 .register(LeaveRequest.class)
132 .register(LeaveResponse.class)
133 .register(ConfigureRequest.class)
134 .register(ConfigureResponse.class)
135 .register(ReconfigureRequest.class)
136 .register(ReconfigureResponse.class)
137 .register(InstallRequest.class)
138 .register(InstallResponse.class)
139 .register(PollRequest.class)
140 .register(PollResponse.class)
141 .register(VoteRequest.class)
142 .register(VoteResponse.class)
143 .register(AppendRequest.class)
144 .register(AppendResponse.class)
145 .register(PublishRequest.class)
146 .register(ResetRequest.class)
147 .register(RaftResponse.Status.class)
148 .register(RaftError.class)
149 .register(RaftError.Type.class)
150 .register(ReadConsistency.class)
151 .register(byte[].class)
152 .register(long[].class)
153 .register(CloseSessionEntry.class)
154 .register(CommandEntry.class)
155 .register(ConfigurationEntry.class)
156 .register(InitializeEntry.class)
157 .register(KeepAliveEntry.class)
158 .register(MetadataEntry.class)
159 .register(OpenSessionEntry.class)
160 .register(QueryEntry.class)
161 .register(RaftOperation.class)
162 .register(RaftEvent.class)
163 .register(DefaultEventType.class)
164 .register(DefaultOperationId.class)
165 .register(OperationType.class)
166 .register(ReadConsistency.class)
167 .register(ArrayList.class)
168 .register(LinkedList.class)
169 .register(Collections.emptyList().getClass())
170 .register(HashSet.class)
171 .register(DefaultRaftMember.class)
172 .register(MemberId.class)
173 .register(SessionId.class)
174 .register(RaftMember.Type.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700175 .register(Instant.class)
176 .register(Configuration.class)
177 .register(AtomixAtomicCounterMapOperations.class)
178 .register(AtomixConsistentMapEvents.class)
179 .register(AtomixConsistentMapOperations.class)
180 .register(AtomixConsistentSetMultimapOperations.class)
181 .register(AtomixConsistentSetMultimapEvents.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700182 .register(AtomixConsistentTreeMapOperations.class)
183 .register(AtomixCounterOperations.class)
184 .register(AtomixDocumentTreeEvents.class)
185 .register(AtomixDocumentTreeOperations.class)
186 .register(AtomixLeaderElectorEvents.class)
187 .register(AtomixLeaderElectorOperations.class)
188 .register(AtomixWorkQueueEvents.class)
189 .register(AtomixWorkQueueOperations.class)
190 .build());
191
192 private static final Serializer STORAGE_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
193 .register(CloseSessionEntry.class)
194 .register(CommandEntry.class)
195 .register(ConfigurationEntry.class)
196 .register(InitializeEntry.class)
197 .register(KeepAliveEntry.class)
198 .register(MetadataEntry.class)
199 .register(OpenSessionEntry.class)
200 .register(QueryEntry.class)
201 .register(RaftOperation.class)
202 .register(ReadConsistency.class)
203 .register(AtomixAtomicCounterMapOperations.class)
204 .register(AtomixConsistentMapOperations.class)
205 .register(AtomixConsistentSetMultimapOperations.class)
206 .register(AtomixConsistentTreeMapOperations.class)
207 .register(AtomixCounterOperations.class)
208 .register(AtomixDocumentTreeOperations.class)
209 .register(AtomixLeaderElectorOperations.class)
210 .register(AtomixWorkQueueOperations.class)
211 .register(ArrayList.class)
212 .register(HashSet.class)
213 .register(DefaultRaftMember.class)
214 .register(MemberId.class)
215 .register(RaftMember.Type.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700216 .register(Instant.class)
217 .register(Configuration.class)
218 .register(byte[].class)
219 .register(long[].class)
220 .build());
221
222 protected TestClusterCommunicationServiceFactory communicationServiceFactory;
223 protected List<RaftMember> members = Lists.newCopyOnWriteArrayList();
224 protected List<RaftClient> clients = Lists.newCopyOnWriteArrayList();
225 protected List<RaftServer> servers = Lists.newCopyOnWriteArrayList();
226 protected int nextId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800227
228 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700229 * Creates the primitive service.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800230 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700231 * @return the primitive service
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800232 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700233 protected abstract RaftService createService();
234
235 /**
236 * Creates a new primitive.
237 *
238 * @param name the primitive name
239 * @return the primitive instance
240 */
241 protected T newPrimitive(String name) {
242 RaftClient client = createClient();
243 RaftProxy proxy = client.newProxyBuilder()
244 .withName(name)
245 .withServiceType("test")
246 .withReadConsistency(readConsistency())
247 .withCommunicationStrategy(communicationStrategy())
248 .build()
249 .open()
250 .join();
251 return createPrimitive(proxy);
252 }
253
254 /**
255 * Creates a new primitive instance.
256 *
257 * @param proxy the primitive proxy
258 * @return the primitive instance
259 */
260 protected abstract T createPrimitive(RaftProxy proxy);
261
262 /**
263 * Returns the proxy read consistency.
264 *
265 * @return the primitive read consistency
266 */
267 protected ReadConsistency readConsistency() {
268 return ReadConsistency.LINEARIZABLE;
269 }
270
271 /**
272 * Returns the proxy communication strategy.
273 *
274 * @return the primitive communication strategy
275 */
276 protected CommunicationStrategy communicationStrategy() {
277 return CommunicationStrategy.LEADER;
278 }
279
280 @Before
281 public void prepare() {
282 members.clear();
283 clients.clear();
284 servers.clear();
285 communicationServiceFactory = new TestClusterCommunicationServiceFactory();
286 createServers(3);
287 }
288
289 @After
290 public void cleanup() {
291 shutdown();
292 }
293
294 /**
295 * Shuts down clients and servers.
296 */
297 private void shutdown() {
298 clients.forEach(c -> {
299 try {
300 c.close().get(10, TimeUnit.SECONDS);
301 } catch (Exception e) {
302 }
303 });
304
305 servers.forEach(s -> {
306 try {
307 if (s.isRunning()) {
308 s.shutdown().get(10, TimeUnit.SECONDS);
309 }
310 } catch (Exception e) {
311 }
312 });
313
314 Path directory = Paths.get("target/primitives/");
315 if (Files.exists(directory)) {
316 try {
317 Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
318 @Override
319 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
320 Files.delete(file);
321 return FileVisitResult.CONTINUE;
322 }
323
324 @Override
325 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
326 Files.delete(dir);
327 return FileVisitResult.CONTINUE;
328 }
329 });
330 } catch (IOException e) {
331 }
332 }
333 }
334
335 /**
336 * Returns the next unique member identifier.
337 *
338 * @return The next unique member identifier.
339 */
340 private MemberId nextMemberId() {
341 return MemberId.from(String.valueOf(++nextId));
342 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800343
344 /**
345 * Returns the next server address.
346 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700347 * @param type The startup member type.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800348 * @return The next server address.
349 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700350 private RaftMember nextMember(RaftMember.Type type) {
351 return new TestMember(nextMemberId(), type);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800352 }
353
354 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700355 * Creates a set of Raft servers.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800356 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700357 protected List<RaftServer> createServers(int nodes) {
358 List<RaftServer> servers = new ArrayList<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800359
360 for (int i = 0; i < nodes; i++) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700361 members.add(nextMember(RaftMember.Type.ACTIVE));
362 }
363
364 CountDownLatch latch = new CountDownLatch(nodes);
365 for (int i = 0; i < nodes; i++) {
366 RaftServer server = createServer(members.get(i));
367 server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList()))
368 .thenRun(latch::countDown);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800369 servers.add(server);
370 }
371
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700372 try {
373 latch.await(30000, TimeUnit.MILLISECONDS);
374 } catch (InterruptedException e) {
375 throw new RuntimeException(e);
376 }
377
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800378 return servers;
379 }
380
381 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700382 * Creates a Raft server.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800383 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700384 private RaftServer createServer(RaftMember member) {
385 RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
386 .withType(member.getType())
387 .withProtocol(new RaftServerCommunicator(
Jordan Halterman980a8c12017-09-22 18:01:19 -0700388 "partition-1",
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700389 PROTOCOL_SERIALIZER,
390 communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
391 .withStorage(RaftStorage.newBuilder()
392 .withStorageLevel(StorageLevel.MEMORY)
393 .withDirectory(new File(String.format("target/primitives/%s", member.memberId())))
394 .withSerializer(new AtomixSerializerAdapter(STORAGE_SERIALIZER))
395 .withMaxSegmentSize(1024 * 1024)
396 .build())
397 .addService("test", this::createService);
398
399 RaftServer server = builder.build();
400 servers.add(server);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800401 return server;
402 }
403
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700404 /**
405 * Creates a Raft client.
406 */
407 private RaftClient createClient() {
408 MemberId memberId = nextMemberId();
409 RaftClient client = RaftClient.newBuilder()
410 .withMemberId(memberId)
411 .withProtocol(new RaftClientCommunicator(
Jordan Halterman980a8c12017-09-22 18:01:19 -0700412 "partition-1",
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700413 PROTOCOL_SERIALIZER,
414 communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
415 .build();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800416
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700417 client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();
418 clients.add(client);
419 return client;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800420 }
421
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800422 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700423 * Test member.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800424 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700425 public static class TestMember implements RaftMember {
426 private final MemberId memberId;
427 private final Type type;
428
429 public TestMember(MemberId memberId, Type type) {
430 this.memberId = memberId;
431 this.type = type;
432 }
433
434 @Override
435 public MemberId memberId() {
436 return memberId;
437 }
438
439 @Override
440 public int hash() {
441 return memberId.hashCode();
442 }
443
444 @Override
445 public Type getType() {
446 return type;
447 }
448
449 @Override
450 public void addTypeChangeListener(Consumer<Type> listener) {
451
452 }
453
454 @Override
455 public void removeTypeChangeListener(Consumer<Type> listener) {
456
457 }
458
459 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700460 public Instant getLastUpdated() {
461 return Instant.now();
462 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700463 @Override
464 public CompletableFuture<Void> promote() {
465 return null;
466 }
467
468 @Override
469 public CompletableFuture<Void> promote(Type type) {
470 return null;
471 }
472
473 @Override
474 public CompletableFuture<Void> demote() {
475 return null;
476 }
477
478 @Override
479 public CompletableFuture<Void> demote(Type type) {
480 return null;
481 }
482
483 @Override
484 public CompletableFuture<Void> remove() {
485 return null;
486 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800487 }
488}