blob: a5c197b46adcd1e67e846361560be51b369e383b [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import io.atomix.Atomix;
19import io.atomix.AtomixClient;
20import io.atomix.catalyst.serializer.Serializer;
21import io.atomix.catalyst.transport.Address;
22import io.atomix.catalyst.transport.LocalServerRegistry;
23import io.atomix.catalyst.transport.LocalTransport;
24import io.atomix.copycat.client.CopycatClient;
25import io.atomix.copycat.server.CopycatServer;
26import io.atomix.copycat.server.storage.Storage;
27import io.atomix.copycat.server.storage.StorageLevel;
28import io.atomix.manager.state.ResourceManagerState;
29import io.atomix.resource.ResourceRegistry;
30import io.atomix.resource.ResourceType;
31
32import java.io.File;
33import java.io.IOException;
34import java.nio.file.Files;
35import java.time.Duration;
36import java.util.ArrayList;
37import java.util.List;
38import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.CountDownLatch;
40
41import org.junit.After;
42import org.junit.Before;
43import org.onosproject.store.primitives.impl.CatalystSerializers;
44
45import com.google.common.util.concurrent.Uninterruptibles;
46
47/**
48 * Base class for various Atomix* tests.
49 */
50public abstract class AtomixTestBase {
51 private static final File TEST_DIR = new File("target/test-logs");
52 protected LocalServerRegistry registry;
53 protected int port;
54 protected List<Address> members;
55 protected List<CopycatClient> copycatClients = new ArrayList<>();
56 protected List<CopycatServer> copycatServers = new ArrayList<>();
57 protected List<Atomix> atomixClients = new ArrayList<>();
58 protected List<CopycatServer> atomixServers = new ArrayList<>();
59 protected Serializer serializer = CatalystSerializers.getSerializer();
60
61 /**
62 * Creates a new resource state machine.
63 *
64 * @return A new resource state machine.
65 */
66 protected abstract ResourceType resourceType();
67
68 /**
69 * Returns the next server address.
70 *
71 * @return The next server address.
72 */
73 private Address nextAddress() {
74 Address address = new Address("localhost", port++);
75 members.add(address);
76 return address;
77 }
78
79 /**
80 * Creates a set of Copycat servers.
81 */
82 protected List<CopycatServer> createCopycatServers(int nodes) throws Throwable {
83 CountDownLatch latch = new CountDownLatch(nodes);
84 List<CopycatServer> servers = new ArrayList<>();
85
86 List<Address> members = new ArrayList<>();
87 for (int i = 0; i < nodes; i++) {
88 members.add(nextAddress());
89 }
90
91 for (int i = 0; i < nodes; i++) {
92 CopycatServer server = createCopycatServer(members.get(i));
93 server.open().thenRun(latch::countDown);
94 servers.add(server);
95 }
96
97 Uninterruptibles.awaitUninterruptibly(latch);
98
99 return servers;
100 }
101
102 /**
103 * Creates a Copycat server.
104 */
105 protected CopycatServer createCopycatServer(Address address) {
106 ResourceRegistry resourceRegistry = new ResourceRegistry();
107 resourceRegistry.register(resourceType());
108 CopycatServer server = CopycatServer.builder(address, members)
109 .withTransport(new LocalTransport(registry))
110 .withStorage(Storage.builder()
111 .withStorageLevel(StorageLevel.DISK)
112 .withDirectory(TEST_DIR + "/" + address.port())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800113 .build())
114 .withStateMachine(() -> new ResourceManagerState(resourceRegistry))
115 .withSerializer(serializer.clone())
116 .withHeartbeatInterval(Duration.ofMillis(25))
117 .withElectionTimeout(Duration.ofMillis(50))
118 .withSessionTimeout(Duration.ofMillis(100))
119 .build();
120 copycatServers.add(server);
121 return server;
122 }
123
124 @Before
125 @After
126 public void clearTests() throws Exception {
127 registry = new LocalServerRegistry();
128 members = new ArrayList<>();
129 port = 5000;
130
131 CompletableFuture<Void> closeClients =
132 CompletableFuture.allOf(atomixClients.stream()
133 .map(Atomix::close)
134 .toArray(CompletableFuture[]::new));
135
136 closeClients.thenCompose(v -> CompletableFuture.allOf(copycatServers.stream()
137 .map(CopycatServer::close)
138 .toArray(CompletableFuture[]::new))).join();
139
140 deleteDirectory(TEST_DIR);
141
142 atomixClients = new ArrayList<>();
143
144 copycatServers = new ArrayList<>();
145 }
146
147 /**
148 * Deletes a directory recursively.
149 */
150 private void deleteDirectory(File directory) throws IOException {
151 if (directory.exists()) {
152 File[] files = directory.listFiles();
153 if (files != null) {
154 for (File file : files) {
155 if (file.isDirectory()) {
156 deleteDirectory(file);
157 } else {
158 Files.delete(file.toPath());
159 }
160 }
161 }
162 Files.delete(directory.toPath());
163 }
164 }
165
166 /**
167 * Creates a Atomix client.
168 */
169 protected Atomix createAtomixClient() {
170 CountDownLatch latch = new CountDownLatch(1);
171 Atomix client = AtomixClient.builder(members)
172 .withTransport(new LocalTransport(registry))
173 .withSerializer(serializer.clone())
174 .withResourceResolver(r -> r.register(resourceType()))
175 .build();
176 client.open().thenRun(latch::countDown);
177 atomixClients.add(client);
178 Uninterruptibles.awaitUninterruptibly(latch);
179 return client;
180 }
181}