blob: bb9f54c1c38ce5bec16b6652d0fd64b0685ebe7f [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Jordan Halterman2bf177c2017-06-29 01:49:08 -070018import java.io.File;
19import java.io.IOException;
20import java.nio.file.FileVisitResult;
21import java.nio.file.Files;
22import java.nio.file.Path;
23import java.nio.file.Paths;
24import java.nio.file.SimpleFileVisitor;
25import java.nio.file.attribute.BasicFileAttributes;
26import java.time.Instant;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.ArrayList;
28import java.util.List;
29import java.util.concurrent.CompletableFuture;
30import java.util.concurrent.CountDownLatch;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070031import java.util.concurrent.TimeUnit;
32import java.util.function.Consumer;
33import java.util.stream.Collectors;
34
35import com.google.common.collect.Lists;
36import io.atomix.protocols.raft.RaftClient;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070037import io.atomix.protocols.raft.RaftServer;
38import io.atomix.protocols.raft.ReadConsistency;
39import io.atomix.protocols.raft.cluster.MemberId;
40import io.atomix.protocols.raft.cluster.RaftMember;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070041import io.atomix.protocols.raft.proxy.CommunicationStrategy;
42import io.atomix.protocols.raft.proxy.RaftProxy;
43import io.atomix.protocols.raft.service.RaftService;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070044import io.atomix.protocols.raft.storage.RaftStorage;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070045import io.atomix.storage.StorageLevel;
46import org.junit.After;
47import org.junit.Before;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070048import org.onosproject.cluster.NodeId;
49import org.onosproject.cluster.PartitionId;
50import org.onosproject.store.primitives.impl.RaftClientCommunicator;
51import org.onosproject.store.primitives.impl.RaftServerCommunicator;
Jordan Haltermanc9571992017-11-03 13:43:19 -070052import org.onosproject.store.primitives.impl.StorageNamespaces;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070053import org.onosproject.store.service.Serializer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054
55/**
Aaron Kruglikovb5a41e52016-06-23 15:37:41 -070056 * Base class for various Atomix tests.
Jordan Halterman2bf177c2017-06-29 01:49:08 -070057 *
58 * @param <T> the Raft primitive type being tested
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070060public abstract class AtomixTestBase<T extends AbstractRaftPrimitive> {
61
Jordan Halterman2bf177c2017-06-29 01:49:08 -070062 protected TestClusterCommunicationServiceFactory communicationServiceFactory;
63 protected List<RaftMember> members = Lists.newCopyOnWriteArrayList();
64 protected List<RaftClient> clients = Lists.newCopyOnWriteArrayList();
65 protected List<RaftServer> servers = Lists.newCopyOnWriteArrayList();
66 protected int nextId;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080067
68 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -070069 * Creates the primitive service.
Madan Jampani5e5b3d62016-02-01 16:03:33 -080070 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -070071 * @return the primitive service
Madan Jampani5e5b3d62016-02-01 16:03:33 -080072 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070073 protected abstract RaftService createService();
74
75 /**
76 * Creates a new primitive.
77 *
78 * @param name the primitive name
79 * @return the primitive instance
80 */
81 protected T newPrimitive(String name) {
82 RaftClient client = createClient();
83 RaftProxy proxy = client.newProxyBuilder()
84 .withName(name)
85 .withServiceType("test")
86 .withReadConsistency(readConsistency())
87 .withCommunicationStrategy(communicationStrategy())
88 .build()
89 .open()
90 .join();
91 return createPrimitive(proxy);
92 }
93
94 /**
95 * Creates a new primitive instance.
96 *
97 * @param proxy the primitive proxy
98 * @return the primitive instance
99 */
100 protected abstract T createPrimitive(RaftProxy proxy);
101
102 /**
103 * Returns the proxy read consistency.
104 *
105 * @return the primitive read consistency
106 */
107 protected ReadConsistency readConsistency() {
108 return ReadConsistency.LINEARIZABLE;
109 }
110
111 /**
112 * Returns the proxy communication strategy.
113 *
114 * @return the primitive communication strategy
115 */
116 protected CommunicationStrategy communicationStrategy() {
117 return CommunicationStrategy.LEADER;
118 }
119
120 @Before
121 public void prepare() {
122 members.clear();
123 clients.clear();
124 servers.clear();
125 communicationServiceFactory = new TestClusterCommunicationServiceFactory();
126 createServers(3);
127 }
128
129 @After
130 public void cleanup() {
131 shutdown();
132 }
133
134 /**
135 * Shuts down clients and servers.
136 */
137 private void shutdown() {
138 clients.forEach(c -> {
139 try {
140 c.close().get(10, TimeUnit.SECONDS);
141 } catch (Exception e) {
142 }
143 });
144
145 servers.forEach(s -> {
146 try {
147 if (s.isRunning()) {
148 s.shutdown().get(10, TimeUnit.SECONDS);
149 }
150 } catch (Exception e) {
151 }
152 });
153
154 Path directory = Paths.get("target/primitives/");
155 if (Files.exists(directory)) {
156 try {
157 Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
158 @Override
159 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
160 Files.delete(file);
161 return FileVisitResult.CONTINUE;
162 }
163
164 @Override
165 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
166 Files.delete(dir);
167 return FileVisitResult.CONTINUE;
168 }
169 });
170 } catch (IOException e) {
171 }
172 }
173 }
174
175 /**
176 * Returns the next unique member identifier.
177 *
178 * @return The next unique member identifier.
179 */
180 private MemberId nextMemberId() {
181 return MemberId.from(String.valueOf(++nextId));
182 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800183
184 /**
185 * Returns the next server address.
186 *
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700187 * @param type The startup member type.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800188 * @return The next server address.
189 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700190 private RaftMember nextMember(RaftMember.Type type) {
191 return new TestMember(nextMemberId(), type);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800192 }
193
194 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700195 * Creates a set of Raft servers.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800196 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700197 protected List<RaftServer> createServers(int nodes) {
198 List<RaftServer> servers = new ArrayList<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800199
200 for (int i = 0; i < nodes; i++) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700201 members.add(nextMember(RaftMember.Type.ACTIVE));
202 }
203
204 CountDownLatch latch = new CountDownLatch(nodes);
205 for (int i = 0; i < nodes; i++) {
206 RaftServer server = createServer(members.get(i));
207 server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList()))
208 .thenRun(latch::countDown);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800209 servers.add(server);
210 }
211
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700212 try {
213 latch.await(30000, TimeUnit.MILLISECONDS);
214 } catch (InterruptedException e) {
215 throw new RuntimeException(e);
216 }
217
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800218 return servers;
219 }
220
221 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700222 * Creates a Raft server.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800223 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700224 private RaftServer createServer(RaftMember member) {
225 RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
226 .withType(member.getType())
227 .withProtocol(new RaftServerCommunicator(
228 PartitionId.from(1),
Jordan Haltermanc9571992017-11-03 13:43:19 -0700229 Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700230 communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
231 .withStorage(RaftStorage.newBuilder()
232 .withStorageLevel(StorageLevel.MEMORY)
233 .withDirectory(new File(String.format("target/primitives/%s", member.memberId())))
Jordan Haltermanc9571992017-11-03 13:43:19 -0700234 .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700235 .withMaxSegmentSize(1024 * 1024)
236 .build())
237 .addService("test", this::createService);
238
239 RaftServer server = builder.build();
240 servers.add(server);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800241 return server;
242 }
243
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700244 /**
245 * Creates a Raft client.
246 */
247 private RaftClient createClient() {
248 MemberId memberId = nextMemberId();
249 RaftClient client = RaftClient.newBuilder()
250 .withMemberId(memberId)
251 .withProtocol(new RaftClientCommunicator(
252 PartitionId.from(1),
Jordan Haltermanc9571992017-11-03 13:43:19 -0700253 Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700254 communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
255 .build();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800256
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700257 client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();
258 clients.add(client);
259 return client;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800260 }
261
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800262 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700263 * Test member.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800264 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700265 public static class TestMember implements RaftMember {
266 private final MemberId memberId;
267 private final Type type;
268
269 public TestMember(MemberId memberId, Type type) {
270 this.memberId = memberId;
271 this.type = type;
272 }
273
274 @Override
275 public MemberId memberId() {
276 return memberId;
277 }
278
279 @Override
280 public int hash() {
281 return memberId.hashCode();
282 }
283
284 @Override
285 public Type getType() {
286 return type;
287 }
288
289 @Override
290 public void addTypeChangeListener(Consumer<Type> listener) {
291
292 }
293
294 @Override
295 public void removeTypeChangeListener(Consumer<Type> listener) {
296
297 }
298
299 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700300 public Instant getLastUpdated() {
301 return Instant.now();
302 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700303 @Override
304 public CompletableFuture<Void> promote() {
305 return null;
306 }
307
308 @Override
309 public CompletableFuture<Void> promote(Type type) {
310 return null;
311 }
312
313 @Override
314 public CompletableFuture<Void> demote() {
315 return null;
316 }
317
318 @Override
319 public CompletableFuture<Void> demote(Type type) {
320 return null;
321 }
322
323 @Override
324 public CompletableFuture<Void> remove() {
325 return null;
326 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800327 }
328}