blob: 414c49c388b59de333ba8858a70a547e6b3d4b8b [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani15b8ef52016-02-02 17:35:05 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Madan Jampani15b8ef52016-02-02 17:35:05 -080018import java.io.File;
19import java.util.Collection;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070020import java.util.Map;
Madan Jampani15b8ef52016-02-02 17:35:05 -080021import java.util.Optional;
Madan Jampanif172d402016-03-04 00:56:38 -080022import java.util.Set;
Madan Jampani15b8ef52016-02-02 17:35:05 -080023import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.atomic.AtomicBoolean;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070025import java.util.function.Supplier;
Madan Jampanif172d402016-03-04 00:56:38 -080026import java.util.stream.Collectors;
Madan Jampani15b8ef52016-02-02 17:35:05 -080027
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import com.google.common.collect.Collections2;
29import com.google.common.collect.ImmutableMap;
30import io.atomix.protocols.raft.cluster.MemberId;
31import io.atomix.protocols.raft.service.RaftService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070032import org.onosproject.cluster.MembershipService;
Madan Jampani15b8ef52016-02-02 17:35:05 -080033import org.onosproject.cluster.NodeId;
34import org.onosproject.cluster.Partition;
Madan Jampani33547452016-02-29 16:45:04 -080035import org.onosproject.cluster.PartitionId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070036import org.onosproject.core.Version;
37import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070038import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
39import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapService;
42import org.onosproject.store.primitives.resources.impl.AtomixCounterService;
43import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeService;
44import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorService;
45import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueService;
46import org.onosproject.store.service.DistributedPrimitive;
Jordan Haltermand0d80352017-08-10 15:08:27 -070047import org.onosproject.store.service.Ordering;
Madan Jampanie14a09c2016-02-11 10:43:21 -080048import org.onosproject.store.service.PartitionInfo;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070049import org.onosproject.store.service.Serializer;
Madan Jampani15b8ef52016-02-02 17:35:05 -080050
51/**
52 * Storage partition.
53 */
Madan Jampani33547452016-02-29 16:45:04 -080054public class StoragePartition implements Managed<StoragePartition> {
Madan Jampani15b8ef52016-02-02 17:35:05 -080055
56 private final AtomicBoolean isOpened = new AtomicBoolean(false);
Jordan Halterman980a8c12017-09-22 18:01:19 -070057 private final UnifiedClusterCommunicationService clusterCommunicator;
58 private final MembershipService clusterService;
59 private final Version version;
60 private final Version source;
61 private final File dataFolder;
Madan Jampani33547452016-02-29 16:45:04 -080062 private Partition partition;
Madan Jampani15b8ef52016-02-02 17:35:05 -080063 private NodeId localNodeId;
Madan Jampani33547452016-02-29 16:45:04 -080064 private StoragePartitionServer server;
Madan Jampani15b8ef52016-02-02 17:35:05 -080065 private StoragePartitionClient client;
66
Jordan Halterman2bf177c2017-06-29 01:49:08 -070067 public static final Map<String, Supplier<RaftService>> RAFT_SERVICES =
68 ImmutableMap.<String, Supplier<RaftService>>builder()
69 .put(DistributedPrimitive.Type.CONSISTENT_MAP.name(), AtomixConsistentMapService::new)
70 .put(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name(), AtomixConsistentTreeMapService::new)
71 .put(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name(), AtomixConsistentSetMultimapService::new)
72 .put(DistributedPrimitive.Type.COUNTER_MAP.name(), AtomixAtomicCounterMapService::new)
73 .put(DistributedPrimitive.Type.COUNTER.name(), AtomixCounterService::new)
74 .put(DistributedPrimitive.Type.LEADER_ELECTOR.name(), AtomixLeaderElectorService::new)
75 .put(DistributedPrimitive.Type.WORK_QUEUE.name(), AtomixWorkQueueService::new)
Jordan Haltermand0d80352017-08-10 15:08:27 -070076 .put(DistributedPrimitive.Type.DOCUMENT_TREE.name(),
77 () -> new AtomixDocumentTreeService(Ordering.NATURAL))
78 .put(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), Ordering.NATURAL),
79 () -> new AtomixDocumentTreeService(Ordering.NATURAL))
80 .put(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), Ordering.INSERTION),
81 () -> new AtomixDocumentTreeService(Ordering.INSERTION))
Jordan Halterman2bf177c2017-06-29 01:49:08 -070082 .build();
Madan Jampani65f24bb2016-03-15 15:16:18 -070083
Jordan Halterman980a8c12017-09-22 18:01:19 -070084 public StoragePartition(
85 Partition partition,
86 Version version,
87 Version source,
88 UnifiedClusterCommunicationService clusterCommunicator,
89 MembershipService clusterService,
90 File dataFolder) {
Madan Jampani33547452016-02-29 16:45:04 -080091 this.partition = partition;
Jordan Halterman980a8c12017-09-22 18:01:19 -070092 this.version = version;
93 this.source = source;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070094 this.clusterCommunicator = clusterCommunicator;
Jordan Halterman980a8c12017-09-22 18:01:19 -070095 this.clusterService = clusterService;
Madan Jampani15b8ef52016-02-02 17:35:05 -080096 this.localNodeId = clusterService.getLocalNode().id();
Jordan Halterman980a8c12017-09-22 18:01:19 -070097 this.dataFolder = dataFolder;
Madan Jampani15b8ef52016-02-02 17:35:05 -080098 }
99
Madan Jampani3a9911c2016-02-21 11:25:45 -0800100 /**
101 * Returns the partition client instance.
102 * @return client
103 */
Madan Jampani15b8ef52016-02-02 17:35:05 -0800104 public StoragePartitionClient client() {
105 return client;
106 }
107
108 @Override
109 public CompletableFuture<Void> open() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700110 if (source != null) {
111 return forkServer(source)
112 .thenCompose(v -> openClient())
113 .thenAccept(v -> isOpened.set(true))
114 .thenApply(v -> null);
115 } else if (partition.getMembers().contains(localNodeId)) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700116 return openServer()
117 .thenCompose(v -> openClient())
118 .thenAccept(v -> isOpened.set(true))
119 .thenApply(v -> null);
Madan Jampanif172d402016-03-04 00:56:38 -0800120 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700121 return openClient()
122 .thenAccept(v -> isOpened.set(true))
123 .thenApply(v -> null);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800124 }
125
126 @Override
127 public CompletableFuture<Void> close() {
Madan Jampani33547452016-02-29 16:45:04 -0800128 // We do not explicitly close the server and instead let the cluster
129 // deal with this as an unclean exit.
Madan Jampani65f24bb2016-03-15 15:16:18 -0700130 return closeClient();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800131 }
132
Madan Jampani33547452016-02-29 16:45:04 -0800133 /**
Jordan Halterman980a8c12017-09-22 18:01:19 -0700134 * Returns the partition name.
135 *
136 * @return the partition name
137 */
138 public String getName() {
139 return getName(version);
140 }
141
142 /**
143 * Returns the partition name for the given version.
144 *
145 * @param version the version for which to return the partition name
146 * @return the partition name for the given version
147 */
148 String getName(Version version) {
149 return version != null ? String.format("partition-%d-%s", partition.getId().id(), version) : "partition-core";
150 }
151
152 /**
153 * Returns the partition version.
154 *
155 * @return the partition version
156 */
157 public Version getVersion() {
158 return version;
159 }
160
161 /**
162 * Returns the partition data folder.
163 *
164 * @return the partition data folder
165 */
166 public File getDataFolder() {
167 return dataFolder;
168 }
169
170 /**
Madan Jampani33547452016-02-29 16:45:04 -0800171 * Returns the identifier of the {@link Partition partition} associated with this instance.
172 * @return partition identifier
173 */
174 public PartitionId getId() {
175 return partition.getId();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800176 }
177
Madan Jampani33547452016-02-29 16:45:04 -0800178 /**
179 * Returns the identifiers of partition members.
180 * @return partition member instance ids
181 */
182 public Collection<NodeId> getMembers() {
183 return partition.getMembers();
184 }
185
186 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700187 * Returns the {@link MemberId identifiers} of partition members.
188 * @return partition member identifiers
Madan Jampani33547452016-02-29 16:45:04 -0800189 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700190 public Collection<MemberId> getMemberIds() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700191 return source != null ?
192 clusterService.getNodes()
193 .stream()
194 .map(node -> MemberId.from(node.id().id()))
195 .collect(Collectors.toList()) :
196 Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
197 }
198
199 Collection<MemberId> getMemberIds(Version version) {
200 if (source == null || version.equals(source)) {
201 return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
202 } else {
203 return clusterService.getNodes()
204 .stream()
205 .map(node -> MemberId.from(node.id().id()))
206 .collect(Collectors.toList());
207 }
Madan Jampani33547452016-02-29 16:45:04 -0800208 }
209
Madan Jampanif172d402016-03-04 00:56:38 -0800210 /**
211 * Attempts to rejoin the partition.
212 * @return future that is completed after the operation is complete
213 */
Madan Jampani33547452016-02-29 16:45:04 -0800214 private CompletableFuture<Void> openServer() {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700215 StoragePartitionServer server = new StoragePartitionServer(
216 this,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700217 MemberId.from(localNodeId.id()),
Jordan Halterman980a8c12017-09-22 18:01:19 -0700218 clusterCommunicator);
Madan Jampani33547452016-02-29 16:45:04 -0800219 return server.open().thenRun(() -> this.server = server);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800220 }
221
Madan Jampanif172d402016-03-04 00:56:38 -0800222 /**
Jordan Halterman980a8c12017-09-22 18:01:19 -0700223 * Forks the server from the given version.
224 *
225 * @return future to be completed once the server has been forked
226 */
227 private CompletableFuture<Void> forkServer(Version version) {
228 StoragePartitionServer server = new StoragePartitionServer(
229 this,
230 MemberId.from(localNodeId.id()),
231 clusterCommunicator);
232
233 CompletableFuture<Void> future;
234 if (clusterService.getNodes().size() == 1) {
235 future = server.fork(version);
236 } else {
237 future = server.join(clusterService.getNodes().stream()
238 .filter(node -> !node.id().equals(localNodeId))
239 .map(node -> MemberId.from(node.id().id()))
240 .collect(Collectors.toList()));
241 }
242 return future.thenRun(() -> this.server = server);
243 }
244
245 /**
Madan Jampanif172d402016-03-04 00:56:38 -0800246 * Attempts to join the partition as a new member.
247 * @return future that is completed after the operation is complete
248 */
249 private CompletableFuture<Void> joinCluster() {
250 Set<NodeId> otherMembers = partition.getMembers()
251 .stream()
252 .filter(nodeId -> !nodeId.equals(localNodeId))
253 .collect(Collectors.toSet());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700254 StoragePartitionServer server = new StoragePartitionServer(this,
255 MemberId.from(localNodeId.id()),
Jordan Halterman980a8c12017-09-22 18:01:19 -0700256 clusterCommunicator);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700257 return server.join(Collections2.transform(otherMembers, n -> MemberId.from(n.id())))
258 .thenRun(() -> this.server = server);
Madan Jampanif172d402016-03-04 00:56:38 -0800259 }
260
Madan Jampani15b8ef52016-02-02 17:35:05 -0800261 private CompletableFuture<StoragePartitionClient> openClient() {
262 client = new StoragePartitionClient(this,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700263 MemberId.from(localNodeId.id()),
264 new RaftClientCommunicator(
Jordan Halterman980a8c12017-09-22 18:01:19 -0700265 getName(),
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700266 Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
267 clusterCommunicator));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800268 return client.open().thenApply(v -> client);
269 }
270
Madan Jampani33547452016-02-29 16:45:04 -0800271 /**
272 * Closes the partition server if it was previously opened.
273 * @return future that is completed when the operation completes
274 */
Madan Jampanif172d402016-03-04 00:56:38 -0800275 public CompletableFuture<Void> leaveCluster() {
Madan Jampani33547452016-02-29 16:45:04 -0800276 return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
277 }
278
279 @Override
280 public boolean isOpen() {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700281 return isOpened.get();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800282 }
283
284 private CompletableFuture<Void> closeClient() {
285 if (client != null) {
286 return client.close();
287 }
288 return CompletableFuture.completedFuture(null);
289 }
290
Madan Jampanie14a09c2016-02-11 10:43:21 -0800291 /**
292 * Returns the partition information if this partition is locally managed i.e.
293 * this node is a active member of the partition.
294 * @return partition info
295 */
296 public Optional<PartitionInfo> info() {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700297 return server != null && server.isOpen() ? Optional.of(server.info()) : Optional.empty();
Madan Jampani33547452016-02-29 16:45:04 -0800298 }
299
Jon Hall1195afb2016-06-28 18:54:07 -0700300 /**
301 * Process updates to partitions and handles joining or leaving a partition.
302 * @param newValue new Partition
303 */
Madan Jampanif172d402016-03-04 00:56:38 -0800304 public void onUpdate(Partition newValue) {
Jon Hall1195afb2016-06-28 18:54:07 -0700305
306 boolean wasPresent = partition.getMembers().contains(localNodeId);
307 boolean isPresent = newValue.getMembers().contains(localNodeId);
Madan Jampanif172d402016-03-04 00:56:38 -0800308 this.partition = newValue;
Jon Hall1195afb2016-06-28 18:54:07 -0700309 if ((wasPresent && isPresent) || (!wasPresent && !isPresent)) {
310 // no action needed
311 return;
312 }
313 //only need to do action if our membership changed
314 if (wasPresent) {
Madan Jampanif172d402016-03-04 00:56:38 -0800315 leaveCluster();
Jon Hall1195afb2016-06-28 18:54:07 -0700316 } else if (isPresent) {
317 joinCluster();
Madan Jampani33547452016-02-29 16:45:04 -0800318 }
Madan Jampanie14a09c2016-02-11 10:43:21 -0800319 }
Madan Jampani15b8ef52016-02-02 17:35:05 -0800320}