blob: 6f3ddeaf19b4780f4b69eb6b6c6fd5e462d26b6f [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman2bf177c2017-06-29 01:49:08 -070018import java.io.File;
19import java.io.IOException;
20import java.nio.file.FileVisitResult;
21import java.nio.file.Files;
22import java.nio.file.Path;
23import java.nio.file.Paths;
24import java.nio.file.SimpleFileVisitor;
25import java.nio.file.attribute.BasicFileAttributes;
26import java.time.Instant;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.ArrayList;
28import java.util.List;
29import java.util.concurrent.CompletableFuture;
30import java.util.concurrent.CountDownLatch;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070031import java.util.concurrent.TimeUnit;
32import java.util.function.Consumer;
33import java.util.stream.Collectors;
34
35import com.google.common.collect.Lists;
36import io.atomix.protocols.raft.RaftClient;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070037import io.atomix.protocols.raft.RaftServer;
38import io.atomix.protocols.raft.ReadConsistency;
39import io.atomix.protocols.raft.cluster.MemberId;
40import io.atomix.protocols.raft.cluster.RaftMember;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070041import io.atomix.protocols.raft.proxy.CommunicationStrategy;
42import io.atomix.protocols.raft.proxy.RaftProxy;
43import io.atomix.protocols.raft.service.RaftService;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070044import io.atomix.protocols.raft.storage.RaftStorage;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070045import io.atomix.storage.StorageLevel;
46import org.junit.After;
47import org.junit.Before;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070048import org.onosproject.cluster.NodeId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070049import org.onosproject.store.primitives.impl.RaftClientCommunicator;
50import org.onosproject.store.primitives.impl.RaftServerCommunicator;
Jordan Halterman309d3452017-11-03 13:43:19 -070051import org.onosproject.store.primitives.impl.StorageNamespaces;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070052import org.onosproject.store.service.Serializer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080053
54/**
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -070055 * Base class for various Atomix tests.
Jordan Halterman2bf177c2017-06-29 01:49:08 -070056 *
57 * @param <T> the Raft primitive type being tested
Madan Jampani5e5b3d62016-02-01 16:03:33 -080058 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070059public abstract class AtomixTestBase<T extends AbstractRaftPrimitive> {
60
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061 protected TestClusterCommunicationServiceFactory communicationServiceFactory;
62 protected List<RaftMember> members = Lists.newCopyOnWriteArrayList();
63 protected List<RaftClient> clients = Lists.newCopyOnWriteArrayList();
64 protected List<RaftServer> servers = Lists.newCopyOnWriteArrayList();
65 protected int nextId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080066
67 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -070068 * Creates the primitive service.
Madan Jampani5e5b3d62016-02-01 16:03:33 -080069 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -070070 * @return the primitive service
Madan Jampani5e5b3d62016-02-01 16:03:33 -080071 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070072 protected abstract RaftService createService();
73
74 /**
75 * Creates a new primitive.
76 *
77 * @param name the primitive name
78 * @return the primitive instance
79 */
80 protected T newPrimitive(String name) {
81 RaftClient client = createClient();
82 RaftProxy proxy = client.newProxyBuilder()
83 .withName(name)
84 .withServiceType("test")
85 .withReadConsistency(readConsistency())
86 .withCommunicationStrategy(communicationStrategy())
87 .build()
88 .open()
89 .join();
90 return createPrimitive(proxy);
91 }
92
93 /**
94 * Creates a new primitive instance.
95 *
96 * @param proxy the primitive proxy
97 * @return the primitive instance
98 */
99 protected abstract T createPrimitive(RaftProxy proxy);
100
101 /**
102 * Returns the proxy read consistency.
103 *
104 * @return the primitive read consistency
105 */
106 protected ReadConsistency readConsistency() {
107 return ReadConsistency.LINEARIZABLE;
108 }
109
110 /**
111 * Returns the proxy communication strategy.
112 *
113 * @return the primitive communication strategy
114 */
115 protected CommunicationStrategy communicationStrategy() {
116 return CommunicationStrategy.LEADER;
117 }
118
119 @Before
120 public void prepare() {
121 members.clear();
122 clients.clear();
123 servers.clear();
124 communicationServiceFactory = new TestClusterCommunicationServiceFactory();
125 createServers(3);
126 }
127
128 @After
129 public void cleanup() {
130 shutdown();
131 }
132
133 /**
134 * Shuts down clients and servers.
135 */
136 private void shutdown() {
137 clients.forEach(c -> {
138 try {
139 c.close().get(10, TimeUnit.SECONDS);
140 } catch (Exception e) {
141 }
142 });
143
144 servers.forEach(s -> {
145 try {
146 if (s.isRunning()) {
147 s.shutdown().get(10, TimeUnit.SECONDS);
148 }
149 } catch (Exception e) {
150 }
151 });
152
153 Path directory = Paths.get("target/primitives/");
154 if (Files.exists(directory)) {
155 try {
156 Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
157 @Override
158 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
159 Files.delete(file);
160 return FileVisitResult.CONTINUE;
161 }
162
163 @Override
164 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
165 Files.delete(dir);
166 return FileVisitResult.CONTINUE;
167 }
168 });
169 } catch (IOException e) {
170 }
171 }
172 }
173
174 /**
175 * Returns the next unique member identifier.
176 *
177 * @return The next unique member identifier.
178 */
179 private MemberId nextMemberId() {
180 return MemberId.from(String.valueOf(++nextId));
181 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800182
183 /**
184 * Returns the next server address.
185 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700186 * @param type The startup member type.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800187 * @return The next server address.
188 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700189 private RaftMember nextMember(RaftMember.Type type) {
190 return new TestMember(nextMemberId(), type);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800191 }
192
193 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700194 * Creates a set of Raft servers.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800195 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700196 protected List<RaftServer> createServers(int nodes) {
197 List<RaftServer> servers = new ArrayList<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800198
199 for (int i = 0; i < nodes; i++) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700200 members.add(nextMember(RaftMember.Type.ACTIVE));
201 }
202
203 CountDownLatch latch = new CountDownLatch(nodes);
204 for (int i = 0; i < nodes; i++) {
205 RaftServer server = createServer(members.get(i));
206 server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList()))
207 .thenRun(latch::countDown);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800208 servers.add(server);
209 }
210
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700211 try {
212 latch.await(30000, TimeUnit.MILLISECONDS);
213 } catch (InterruptedException e) {
Ray Milkeydbd38212018-07-02 09:18:09 -0700214 throw new IllegalStateException(e);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700215 }
216
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800217 return servers;
218 }
219
220 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700221 * Creates a Raft server.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800222 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700223 private RaftServer createServer(RaftMember member) {
224 RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
225 .withType(member.getType())
226 .withProtocol(new RaftServerCommunicator(
Jordan Halterman980a8c12017-09-22 18:01:19 -0700227 "partition-1",
Jordan Halterman309d3452017-11-03 13:43:19 -0700228 Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700229 communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
230 .withStorage(RaftStorage.newBuilder()
231 .withStorageLevel(StorageLevel.MEMORY)
232 .withDirectory(new File(String.format("target/primitives/%s", member.memberId())))
Jordan Halterman309d3452017-11-03 13:43:19 -0700233 .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700234 .withMaxSegmentSize(1024 * 1024)
235 .build())
236 .addService("test", this::createService);
237
238 RaftServer server = builder.build();
239 servers.add(server);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800240 return server;
241 }
242
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700243 /**
244 * Creates a Raft client.
245 */
246 private RaftClient createClient() {
247 MemberId memberId = nextMemberId();
248 RaftClient client = RaftClient.newBuilder()
249 .withMemberId(memberId)
250 .withProtocol(new RaftClientCommunicator(
Jordan Halterman980a8c12017-09-22 18:01:19 -0700251 "partition-1",
Jordan Halterman309d3452017-11-03 13:43:19 -0700252 Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700253 communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
254 .build();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800255
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700256 client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();
257 clients.add(client);
258 return client;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800259 }
260
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800261 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700262 * Test member.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800263 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700264 public static class TestMember implements RaftMember {
265 private final MemberId memberId;
266 private final Type type;
267
268 public TestMember(MemberId memberId, Type type) {
269 this.memberId = memberId;
270 this.type = type;
271 }
272
273 @Override
274 public MemberId memberId() {
275 return memberId;
276 }
277
278 @Override
279 public int hash() {
280 return memberId.hashCode();
281 }
282
283 @Override
284 public Type getType() {
285 return type;
286 }
287
288 @Override
289 public void addTypeChangeListener(Consumer<Type> listener) {
290
291 }
292
293 @Override
294 public void removeTypeChangeListener(Consumer<Type> listener) {
295
296 }
297
298 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700299 public Instant getLastUpdated() {
300 return Instant.now();
301 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700302 @Override
303 public CompletableFuture<Void> promote() {
304 return null;
305 }
306
307 @Override
308 public CompletableFuture<Void> promote(Type type) {
309 return null;
310 }
311
312 @Override
313 public CompletableFuture<Void> demote() {
314 return null;
315 }
316
317 @Override
318 public CompletableFuture<Void> demote(Type type) {
319 return null;
320 }
321
322 @Override
323 public CompletableFuture<Void> remove() {
324 return null;
325 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800326 }
327}