blob: dbfee3c315a9aae14045013f005d4ef90b4b139a [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman2bf177c2017-06-29 01:49:08 -070018import java.io.File;
19import java.io.IOException;
20import java.nio.file.FileVisitResult;
21import java.nio.file.Files;
22import java.nio.file.Path;
23import java.nio.file.Paths;
24import java.nio.file.SimpleFileVisitor;
25import java.nio.file.attribute.BasicFileAttributes;
26import java.time.Instant;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.ArrayList;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import java.util.Collections;
29import java.util.HashSet;
30import java.util.LinkedList;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080031import java.util.List;
32import java.util.concurrent.CompletableFuture;
33import java.util.concurrent.CountDownLatch;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070034import java.util.concurrent.TimeUnit;
35import java.util.function.Consumer;
36import java.util.stream.Collectors;
37
38import com.google.common.collect.Lists;
39import io.atomix.protocols.raft.RaftClient;
40import io.atomix.protocols.raft.RaftError;
41import io.atomix.protocols.raft.RaftServer;
42import io.atomix.protocols.raft.ReadConsistency;
43import io.atomix.protocols.raft.cluster.MemberId;
44import io.atomix.protocols.raft.cluster.RaftMember;
45import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
46import io.atomix.protocols.raft.event.RaftEvent;
47import io.atomix.protocols.raft.event.impl.DefaultEventType;
48import io.atomix.protocols.raft.operation.OperationType;
49import io.atomix.protocols.raft.operation.RaftOperation;
50import io.atomix.protocols.raft.operation.impl.DefaultOperationId;
51import io.atomix.protocols.raft.protocol.AppendRequest;
52import io.atomix.protocols.raft.protocol.AppendResponse;
53import io.atomix.protocols.raft.protocol.CloseSessionRequest;
54import io.atomix.protocols.raft.protocol.CloseSessionResponse;
55import io.atomix.protocols.raft.protocol.CommandRequest;
56import io.atomix.protocols.raft.protocol.CommandResponse;
57import io.atomix.protocols.raft.protocol.ConfigureRequest;
58import io.atomix.protocols.raft.protocol.ConfigureResponse;
59import io.atomix.protocols.raft.protocol.InstallRequest;
60import io.atomix.protocols.raft.protocol.InstallResponse;
61import io.atomix.protocols.raft.protocol.JoinRequest;
62import io.atomix.protocols.raft.protocol.JoinResponse;
63import io.atomix.protocols.raft.protocol.KeepAliveRequest;
64import io.atomix.protocols.raft.protocol.KeepAliveResponse;
65import io.atomix.protocols.raft.protocol.LeaveRequest;
66import io.atomix.protocols.raft.protocol.LeaveResponse;
67import io.atomix.protocols.raft.protocol.MetadataRequest;
68import io.atomix.protocols.raft.protocol.MetadataResponse;
69import io.atomix.protocols.raft.protocol.OpenSessionRequest;
70import io.atomix.protocols.raft.protocol.OpenSessionResponse;
71import io.atomix.protocols.raft.protocol.PollRequest;
72import io.atomix.protocols.raft.protocol.PollResponse;
73import io.atomix.protocols.raft.protocol.PublishRequest;
74import io.atomix.protocols.raft.protocol.QueryRequest;
75import io.atomix.protocols.raft.protocol.QueryResponse;
76import io.atomix.protocols.raft.protocol.RaftResponse;
77import io.atomix.protocols.raft.protocol.ReconfigureRequest;
78import io.atomix.protocols.raft.protocol.ReconfigureResponse;
79import io.atomix.protocols.raft.protocol.ResetRequest;
80import io.atomix.protocols.raft.protocol.VoteRequest;
81import io.atomix.protocols.raft.protocol.VoteResponse;
82import io.atomix.protocols.raft.proxy.CommunicationStrategy;
83import io.atomix.protocols.raft.proxy.RaftProxy;
84import io.atomix.protocols.raft.service.RaftService;
85import io.atomix.protocols.raft.session.SessionId;
86import io.atomix.protocols.raft.storage.RaftStorage;
87import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
88import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
89import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
90import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
91import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
92import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
93import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
94import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
95import io.atomix.protocols.raft.storage.system.Configuration;
96import io.atomix.storage.StorageLevel;
97import org.junit.After;
98import org.junit.Before;
99import org.onlab.util.KryoNamespace;
100import org.onosproject.cluster.NodeId;
101import org.onosproject.cluster.PartitionId;
102import org.onosproject.store.primitives.impl.RaftClientCommunicator;
103import org.onosproject.store.primitives.impl.RaftServerCommunicator;
104import org.onosproject.store.service.Serializer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800105
106/**
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -0700107 * Base class for various Atomix tests.
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700108 *
109 * @param <T> the Raft primitive type being tested
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700111public abstract class AtomixTestBase<T extends AbstractRaftPrimitive> {
112
113 private static final Serializer PROTOCOL_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
114 .register(OpenSessionRequest.class)
115 .register(OpenSessionResponse.class)
116 .register(CloseSessionRequest.class)
117 .register(CloseSessionResponse.class)
118 .register(KeepAliveRequest.class)
119 .register(KeepAliveResponse.class)
120 .register(QueryRequest.class)
121 .register(QueryResponse.class)
122 .register(CommandRequest.class)
123 .register(CommandResponse.class)
124 .register(MetadataRequest.class)
125 .register(MetadataResponse.class)
126 .register(JoinRequest.class)
127 .register(JoinResponse.class)
128 .register(LeaveRequest.class)
129 .register(LeaveResponse.class)
130 .register(ConfigureRequest.class)
131 .register(ConfigureResponse.class)
132 .register(ReconfigureRequest.class)
133 .register(ReconfigureResponse.class)
134 .register(InstallRequest.class)
135 .register(InstallResponse.class)
136 .register(PollRequest.class)
137 .register(PollResponse.class)
138 .register(VoteRequest.class)
139 .register(VoteResponse.class)
140 .register(AppendRequest.class)
141 .register(AppendResponse.class)
142 .register(PublishRequest.class)
143 .register(ResetRequest.class)
144 .register(RaftResponse.Status.class)
145 .register(RaftError.class)
146 .register(RaftError.Type.class)
147 .register(ReadConsistency.class)
148 .register(byte[].class)
149 .register(long[].class)
150 .register(CloseSessionEntry.class)
151 .register(CommandEntry.class)
152 .register(ConfigurationEntry.class)
153 .register(InitializeEntry.class)
154 .register(KeepAliveEntry.class)
155 .register(MetadataEntry.class)
156 .register(OpenSessionEntry.class)
157 .register(QueryEntry.class)
158 .register(RaftOperation.class)
159 .register(RaftEvent.class)
160 .register(DefaultEventType.class)
161 .register(DefaultOperationId.class)
162 .register(OperationType.class)
163 .register(ReadConsistency.class)
164 .register(ArrayList.class)
165 .register(LinkedList.class)
166 .register(Collections.emptyList().getClass())
167 .register(HashSet.class)
168 .register(DefaultRaftMember.class)
169 .register(MemberId.class)
170 .register(SessionId.class)
171 .register(RaftMember.Type.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700172 .register(Instant.class)
173 .register(Configuration.class)
174 .register(AtomixAtomicCounterMapOperations.class)
175 .register(AtomixConsistentMapEvents.class)
176 .register(AtomixConsistentMapOperations.class)
177 .register(AtomixConsistentSetMultimapOperations.class)
178 .register(AtomixConsistentSetMultimapEvents.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700179 .register(AtomixConsistentTreeMapOperations.class)
180 .register(AtomixCounterOperations.class)
181 .register(AtomixDocumentTreeEvents.class)
182 .register(AtomixDocumentTreeOperations.class)
183 .register(AtomixLeaderElectorEvents.class)
184 .register(AtomixLeaderElectorOperations.class)
185 .register(AtomixWorkQueueEvents.class)
186 .register(AtomixWorkQueueOperations.class)
187 .build());
188
189 private static final Serializer STORAGE_SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
190 .register(CloseSessionEntry.class)
191 .register(CommandEntry.class)
192 .register(ConfigurationEntry.class)
193 .register(InitializeEntry.class)
194 .register(KeepAliveEntry.class)
195 .register(MetadataEntry.class)
196 .register(OpenSessionEntry.class)
197 .register(QueryEntry.class)
198 .register(RaftOperation.class)
199 .register(ReadConsistency.class)
200 .register(AtomixAtomicCounterMapOperations.class)
201 .register(AtomixConsistentMapOperations.class)
202 .register(AtomixConsistentSetMultimapOperations.class)
203 .register(AtomixConsistentTreeMapOperations.class)
204 .register(AtomixCounterOperations.class)
205 .register(AtomixDocumentTreeOperations.class)
206 .register(AtomixLeaderElectorOperations.class)
207 .register(AtomixWorkQueueOperations.class)
208 .register(ArrayList.class)
209 .register(HashSet.class)
210 .register(DefaultRaftMember.class)
211 .register(MemberId.class)
212 .register(RaftMember.Type.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700213 .register(Instant.class)
214 .register(Configuration.class)
215 .register(byte[].class)
216 .register(long[].class)
217 .build());
218
219 protected TestClusterCommunicationServiceFactory communicationServiceFactory;
220 protected List<RaftMember> members = Lists.newCopyOnWriteArrayList();
221 protected List<RaftClient> clients = Lists.newCopyOnWriteArrayList();
222 protected List<RaftServer> servers = Lists.newCopyOnWriteArrayList();
223 protected int nextId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800224
225 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700226 * Creates the primitive service.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800227 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700228 * @return the primitive service
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800229 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700230 protected abstract RaftService createService();
231
232 /**
233 * Creates a new primitive.
234 *
235 * @param name the primitive name
236 * @return the primitive instance
237 */
238 protected T newPrimitive(String name) {
239 RaftClient client = createClient();
240 RaftProxy proxy = client.newProxyBuilder()
241 .withName(name)
242 .withServiceType("test")
243 .withReadConsistency(readConsistency())
244 .withCommunicationStrategy(communicationStrategy())
245 .build()
246 .open()
247 .join();
248 return createPrimitive(proxy);
249 }
250
251 /**
252 * Creates a new primitive instance.
253 *
254 * @param proxy the primitive proxy
255 * @return the primitive instance
256 */
257 protected abstract T createPrimitive(RaftProxy proxy);
258
259 /**
260 * Returns the proxy read consistency.
261 *
262 * @return the primitive read consistency
263 */
264 protected ReadConsistency readConsistency() {
265 return ReadConsistency.LINEARIZABLE;
266 }
267
268 /**
269 * Returns the proxy communication strategy.
270 *
271 * @return the primitive communication strategy
272 */
273 protected CommunicationStrategy communicationStrategy() {
274 return CommunicationStrategy.LEADER;
275 }
276
277 @Before
278 public void prepare() {
279 members.clear();
280 clients.clear();
281 servers.clear();
282 communicationServiceFactory = new TestClusterCommunicationServiceFactory();
283 createServers(3);
284 }
285
286 @After
287 public void cleanup() {
288 shutdown();
289 }
290
291 /**
292 * Shuts down clients and servers.
293 */
294 private void shutdown() {
295 clients.forEach(c -> {
296 try {
297 c.close().get(10, TimeUnit.SECONDS);
298 } catch (Exception e) {
299 }
300 });
301
302 servers.forEach(s -> {
303 try {
304 if (s.isRunning()) {
305 s.shutdown().get(10, TimeUnit.SECONDS);
306 }
307 } catch (Exception e) {
308 }
309 });
310
311 Path directory = Paths.get("target/primitives/");
312 if (Files.exists(directory)) {
313 try {
314 Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
315 @Override
316 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
317 Files.delete(file);
318 return FileVisitResult.CONTINUE;
319 }
320
321 @Override
322 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
323 Files.delete(dir);
324 return FileVisitResult.CONTINUE;
325 }
326 });
327 } catch (IOException e) {
328 }
329 }
330 }
331
332 /**
333 * Returns the next unique member identifier.
334 *
335 * @return The next unique member identifier.
336 */
337 private MemberId nextMemberId() {
338 return MemberId.from(String.valueOf(++nextId));
339 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800340
341 /**
342 * Returns the next server address.
343 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700344 * @param type The startup member type.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800345 * @return The next server address.
346 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700347 private RaftMember nextMember(RaftMember.Type type) {
348 return new TestMember(nextMemberId(), type);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800349 }
350
351 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700352 * Creates a set of Raft servers.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800353 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700354 protected List<RaftServer> createServers(int nodes) {
355 List<RaftServer> servers = new ArrayList<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800356
357 for (int i = 0; i < nodes; i++) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700358 members.add(nextMember(RaftMember.Type.ACTIVE));
359 }
360
361 CountDownLatch latch = new CountDownLatch(nodes);
362 for (int i = 0; i < nodes; i++) {
363 RaftServer server = createServer(members.get(i));
364 server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList()))
365 .thenRun(latch::countDown);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800366 servers.add(server);
367 }
368
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700369 try {
370 latch.await(30000, TimeUnit.MILLISECONDS);
371 } catch (InterruptedException e) {
372 throw new RuntimeException(e);
373 }
374
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800375 return servers;
376 }
377
378 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700379 * Creates a Raft server.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800380 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700381 private RaftServer createServer(RaftMember member) {
382 RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
383 .withType(member.getType())
384 .withProtocol(new RaftServerCommunicator(
385 PartitionId.from(1),
386 PROTOCOL_SERIALIZER,
387 communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
388 .withStorage(RaftStorage.newBuilder()
389 .withStorageLevel(StorageLevel.MEMORY)
390 .withDirectory(new File(String.format("target/primitives/%s", member.memberId())))
391 .withSerializer(new AtomixSerializerAdapter(STORAGE_SERIALIZER))
392 .withMaxSegmentSize(1024 * 1024)
393 .build())
394 .addService("test", this::createService);
395
396 RaftServer server = builder.build();
397 servers.add(server);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800398 return server;
399 }
400
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700401 /**
402 * Creates a Raft client.
403 */
404 private RaftClient createClient() {
405 MemberId memberId = nextMemberId();
406 RaftClient client = RaftClient.newBuilder()
407 .withMemberId(memberId)
408 .withProtocol(new RaftClientCommunicator(
409 PartitionId.from(1),
410 PROTOCOL_SERIALIZER,
411 communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
412 .build();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800413
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700414 client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();
415 clients.add(client);
416 return client;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800417 }
418
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800419 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700420 * Test member.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800421 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700422 public static class TestMember implements RaftMember {
423 private final MemberId memberId;
424 private final Type type;
425
426 public TestMember(MemberId memberId, Type type) {
427 this.memberId = memberId;
428 this.type = type;
429 }
430
431 @Override
432 public MemberId memberId() {
433 return memberId;
434 }
435
436 @Override
437 public int hash() {
438 return memberId.hashCode();
439 }
440
441 @Override
442 public Type getType() {
443 return type;
444 }
445
446 @Override
447 public void addTypeChangeListener(Consumer<Type> listener) {
448
449 }
450
451 @Override
452 public void removeTypeChangeListener(Consumer<Type> listener) {
453
454 }
455
456 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700457 public Instant getLastUpdated() {
458 return Instant.now();
459 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700460 @Override
461 public CompletableFuture<Void> promote() {
462 return null;
463 }
464
465 @Override
466 public CompletableFuture<Void> promote(Type type) {
467 return null;
468 }
469
470 @Override
471 public CompletableFuture<Void> demote() {
472 return null;
473 }
474
475 @Override
476 public CompletableFuture<Void> demote(Type type) {
477 return null;
478 }
479
480 @Override
481 public CompletableFuture<Void> remove() {
482 return null;
483 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800484 }
485}