blob: 842e047f38a037b8e211ed73375e757666e8561b [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani15b8ef52016-02-02 17:35:05 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Madan Jampani15b8ef52016-02-02 17:35:05 -080018import java.io.File;
19import java.util.Collection;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070020import java.util.Map;
Madan Jampani15b8ef52016-02-02 17:35:05 -080021import java.util.Optional;
Madan Jampanif172d402016-03-04 00:56:38 -080022import java.util.Set;
Madan Jampani15b8ef52016-02-02 17:35:05 -080023import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.atomic.AtomicBoolean;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070025import java.util.function.Supplier;
Madan Jampanif172d402016-03-04 00:56:38 -080026import java.util.stream.Collectors;
Madan Jampani15b8ef52016-02-02 17:35:05 -080027
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import com.google.common.collect.Collections2;
29import com.google.common.collect.ImmutableMap;
30import io.atomix.protocols.raft.cluster.MemberId;
31import io.atomix.protocols.raft.service.RaftService;
Madan Jampani15b8ef52016-02-02 17:35:05 -080032import org.onosproject.cluster.ClusterService;
Madan Jampani15b8ef52016-02-02 17:35:05 -080033import org.onosproject.cluster.NodeId;
34import org.onosproject.cluster.Partition;
Madan Jampani33547452016-02-29 16:45:04 -080035import org.onosproject.cluster.PartitionId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070036import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
37import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
38import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
39import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapService;
41import org.onosproject.store.primitives.resources.impl.AtomixCounterService;
42import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeService;
43import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorService;
44import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueService;
45import org.onosproject.store.service.DistributedPrimitive;
Jordan Haltermand0d80352017-08-10 15:08:27 -070046import org.onosproject.store.service.Ordering;
Madan Jampanie14a09c2016-02-11 10:43:21 -080047import org.onosproject.store.service.PartitionInfo;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070048import org.onosproject.store.service.Serializer;
Madan Jampani15b8ef52016-02-02 17:35:05 -080049
50/**
51 * Storage partition.
52 */
Madan Jampani33547452016-02-29 16:45:04 -080053public class StoragePartition implements Managed<StoragePartition> {
Madan Jampani15b8ef52016-02-02 17:35:05 -080054
55 private final AtomicBoolean isOpened = new AtomicBoolean(false);
Jordan Halterman2bf177c2017-06-29 01:49:08 -070056 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani15b8ef52016-02-02 17:35:05 -080057 private final File logFolder;
Madan Jampani33547452016-02-29 16:45:04 -080058 private Partition partition;
Madan Jampani15b8ef52016-02-02 17:35:05 -080059 private NodeId localNodeId;
Madan Jampani33547452016-02-29 16:45:04 -080060 private StoragePartitionServer server;
Madan Jampani15b8ef52016-02-02 17:35:05 -080061 private StoragePartitionClient client;
62
Jordan Halterman2bf177c2017-06-29 01:49:08 -070063 public static final Map<String, Supplier<RaftService>> RAFT_SERVICES =
64 ImmutableMap.<String, Supplier<RaftService>>builder()
65 .put(DistributedPrimitive.Type.CONSISTENT_MAP.name(), AtomixConsistentMapService::new)
66 .put(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name(), AtomixConsistentTreeMapService::new)
67 .put(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name(), AtomixConsistentSetMultimapService::new)
68 .put(DistributedPrimitive.Type.COUNTER_MAP.name(), AtomixAtomicCounterMapService::new)
69 .put(DistributedPrimitive.Type.COUNTER.name(), AtomixCounterService::new)
70 .put(DistributedPrimitive.Type.LEADER_ELECTOR.name(), AtomixLeaderElectorService::new)
71 .put(DistributedPrimitive.Type.WORK_QUEUE.name(), AtomixWorkQueueService::new)
Jordan Haltermand0d80352017-08-10 15:08:27 -070072 .put(DistributedPrimitive.Type.DOCUMENT_TREE.name(),
73 () -> new AtomixDocumentTreeService(Ordering.NATURAL))
74 .put(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), Ordering.NATURAL),
75 () -> new AtomixDocumentTreeService(Ordering.NATURAL))
76 .put(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), Ordering.INSERTION),
77 () -> new AtomixDocumentTreeService(Ordering.INSERTION))
Jordan Halterman2bf177c2017-06-29 01:49:08 -070078 .build();
Madan Jampani65f24bb2016-03-15 15:16:18 -070079
Madan Jampani15b8ef52016-02-02 17:35:05 -080080 public StoragePartition(Partition partition,
Jordan Halterman2bf177c2017-06-29 01:49:08 -070081 ClusterCommunicationService clusterCommunicator,
Madan Jampani15b8ef52016-02-02 17:35:05 -080082 ClusterService clusterService,
Madan Jampani15b8ef52016-02-02 17:35:05 -080083 File logFolder) {
Madan Jampani33547452016-02-29 16:45:04 -080084 this.partition = partition;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070085 this.clusterCommunicator = clusterCommunicator;
Madan Jampani15b8ef52016-02-02 17:35:05 -080086 this.localNodeId = clusterService.getLocalNode().id();
Madan Jampani15b8ef52016-02-02 17:35:05 -080087 this.logFolder = logFolder;
88 }
89
Madan Jampani3a9911c2016-02-21 11:25:45 -080090 /**
91 * Returns the partition client instance.
92 * @return client
93 */
Madan Jampani15b8ef52016-02-02 17:35:05 -080094 public StoragePartitionClient client() {
95 return client;
96 }
97
98 @Override
99 public CompletableFuture<Void> open() {
Madan Jampanif172d402016-03-04 00:56:38 -0800100 if (partition.getMembers().contains(localNodeId)) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700101 return openServer()
102 .thenCompose(v -> openClient())
103 .thenAccept(v -> isOpened.set(true))
104 .thenApply(v -> null);
Madan Jampanif172d402016-03-04 00:56:38 -0800105 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700106 return openClient()
107 .thenAccept(v -> isOpened.set(true))
108 .thenApply(v -> null);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800109 }
110
111 @Override
112 public CompletableFuture<Void> close() {
Madan Jampani33547452016-02-29 16:45:04 -0800113 // We do not explicitly close the server and instead let the cluster
114 // deal with this as an unclean exit.
Madan Jampani65f24bb2016-03-15 15:16:18 -0700115 return closeClient();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800116 }
117
Madan Jampani33547452016-02-29 16:45:04 -0800118 /**
119 * Returns the identifier of the {@link Partition partition} associated with this instance.
120 * @return partition identifier
121 */
122 public PartitionId getId() {
123 return partition.getId();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800124 }
125
Madan Jampani33547452016-02-29 16:45:04 -0800126 /**
127 * Returns the identifiers of partition members.
128 * @return partition member instance ids
129 */
130 public Collection<NodeId> getMembers() {
131 return partition.getMembers();
132 }
133
134 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700135 * Returns the {@link MemberId identifiers} of partition members.
136 * @return partition member identifiers
Madan Jampani33547452016-02-29 16:45:04 -0800137 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700138 public Collection<MemberId> getMemberIds() {
139 return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
Madan Jampani33547452016-02-29 16:45:04 -0800140 }
141
Madan Jampanif172d402016-03-04 00:56:38 -0800142 /**
143 * Attempts to rejoin the partition.
144 * @return future that is completed after the operation is complete
145 */
Madan Jampani33547452016-02-29 16:45:04 -0800146 private CompletableFuture<Void> openServer() {
147 if (!partition.getMembers().contains(localNodeId) || server != null) {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800148 return CompletableFuture.completedFuture(null);
149 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700150 StoragePartitionServer server = new StoragePartitionServer(this,
151 MemberId.from(localNodeId.id()),
152 () -> new RaftServerCommunicator(
153 partition.getId(),
154 Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
155 clusterCommunicator),
Madan Jampani15b8ef52016-02-02 17:35:05 -0800156 logFolder);
Madan Jampani33547452016-02-29 16:45:04 -0800157 return server.open().thenRun(() -> this.server = server);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800158 }
159
Madan Jampanif172d402016-03-04 00:56:38 -0800160 /**
161 * Attempts to join the partition as a new member.
162 * @return future that is completed after the operation is complete
163 */
164 private CompletableFuture<Void> joinCluster() {
165 Set<NodeId> otherMembers = partition.getMembers()
166 .stream()
167 .filter(nodeId -> !nodeId.equals(localNodeId))
168 .collect(Collectors.toSet());
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700169 StoragePartitionServer server = new StoragePartitionServer(this,
170 MemberId.from(localNodeId.id()),
171 () -> new RaftServerCommunicator(
172 partition.getId(),
173 Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
174 clusterCommunicator),
Madan Jampanif172d402016-03-04 00:56:38 -0800175 logFolder);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700176 return server.join(Collections2.transform(otherMembers, n -> MemberId.from(n.id())))
177 .thenRun(() -> this.server = server);
Madan Jampanif172d402016-03-04 00:56:38 -0800178 }
179
Madan Jampani15b8ef52016-02-02 17:35:05 -0800180 private CompletableFuture<StoragePartitionClient> openClient() {
181 client = new StoragePartitionClient(this,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700182 MemberId.from(localNodeId.id()),
183 new RaftClientCommunicator(
184 partition.getId(),
185 Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
186 clusterCommunicator));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800187 return client.open().thenApply(v -> client);
188 }
189
Madan Jampani33547452016-02-29 16:45:04 -0800190 /**
191 * Closes the partition server if it was previously opened.
192 * @return future that is completed when the operation completes
193 */
Madan Jampanif172d402016-03-04 00:56:38 -0800194 public CompletableFuture<Void> leaveCluster() {
Madan Jampani33547452016-02-29 16:45:04 -0800195 return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
196 }
197
198 @Override
199 public boolean isOpen() {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700200 return isOpened.get();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800201 }
202
203 private CompletableFuture<Void> closeClient() {
204 if (client != null) {
205 return client.close();
206 }
207 return CompletableFuture.completedFuture(null);
208 }
209
Madan Jampanie14a09c2016-02-11 10:43:21 -0800210 /**
211 * Returns the partition information if this partition is locally managed i.e.
212 * this node is a active member of the partition.
213 * @return partition info
214 */
215 public Optional<PartitionInfo> info() {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700216 return server != null && server.isOpen() ? Optional.of(server.info()) : Optional.empty();
Madan Jampani33547452016-02-29 16:45:04 -0800217 }
218
Jon Hall1195afb2016-06-28 18:54:07 -0700219 /**
220 * Process updates to partitions and handles joining or leaving a partition.
221 * @param newValue new Partition
222 */
Madan Jampanif172d402016-03-04 00:56:38 -0800223 public void onUpdate(Partition newValue) {
Jon Hall1195afb2016-06-28 18:54:07 -0700224
225 boolean wasPresent = partition.getMembers().contains(localNodeId);
226 boolean isPresent = newValue.getMembers().contains(localNodeId);
Madan Jampanif172d402016-03-04 00:56:38 -0800227 this.partition = newValue;
Jon Hall1195afb2016-06-28 18:54:07 -0700228 if ((wasPresent && isPresent) || (!wasPresent && !isPresent)) {
229 // no action needed
230 return;
231 }
232 //only need to do action if our membership changed
233 if (wasPresent) {
Madan Jampanif172d402016-03-04 00:56:38 -0800234 leaveCluster();
Jon Hall1195afb2016-06-28 18:54:07 -0700235 } else if (isPresent) {
236 joinCluster();
Madan Jampani33547452016-02-29 16:45:04 -0800237 }
Madan Jampanie14a09c2016-02-11 10:43:21 -0800238 }
Madan Jampani15b8ef52016-02-02 17:35:05 -0800239}