blob: 7ed275ab4df8919abf6774771de39a3921211cd2 [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani15b8ef52016-02-02 17:35:05 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import io.atomix.catalyst.serializer.Serializer;
19import io.atomix.catalyst.transport.Address;
20import io.atomix.resource.ResourceType;
Madan Jampani15b8ef52016-02-02 17:35:05 -080021
22import java.io.File;
23import java.util.Collection;
24import java.util.Optional;
Madan Jampanif172d402016-03-04 00:56:38 -080025import java.util.Set;
Madan Jampani15b8ef52016-02-02 17:35:05 -080026import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampanif172d402016-03-04 00:56:38 -080028import java.util.stream.Collectors;
Madan Jampani15b8ef52016-02-02 17:35:05 -080029
30import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.ControllerNode;
Madan Jampani15b8ef52016-02-02 17:35:05 -080032import org.onosproject.cluster.NodeId;
33import org.onosproject.cluster.Partition;
Madan Jampani33547452016-02-29 16:45:04 -080034import org.onosproject.cluster.PartitionId;
Madan Jampani15b8ef52016-02-02 17:35:05 -080035import org.onosproject.store.cluster.messaging.MessagingService;
36import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
Madan Jampani39fff102016-02-14 13:17:28 -080037import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
Madan Jampanie14a09c2016-02-11 10:43:21 -080038import org.onosproject.store.service.PartitionInfo;
Madan Jampani15b8ef52016-02-02 17:35:05 -080039
40import com.google.common.collect.Collections2;
41import com.google.common.collect.ImmutableSet;
42
43/**
44 * Storage partition.
45 */
Madan Jampani33547452016-02-29 16:45:04 -080046public class StoragePartition implements Managed<StoragePartition> {
Madan Jampani15b8ef52016-02-02 17:35:05 -080047
48 private final AtomicBoolean isOpened = new AtomicBoolean(false);
Madan Jampani15b8ef52016-02-02 17:35:05 -080049 private final Serializer serializer;
50 private final MessagingService messagingService;
51 private final ClusterService clusterService;
52 private final File logFolder;
Madan Jampani33547452016-02-29 16:45:04 -080053 private Partition partition;
Madan Jampani15b8ef52016-02-02 17:35:05 -080054 private NodeId localNodeId;
Madan Jampani33547452016-02-29 16:45:04 -080055 private StoragePartitionServer server;
Madan Jampani15b8ef52016-02-02 17:35:05 -080056 private StoragePartitionClient client;
57
Madan Jampani65f24bb2016-03-15 15:16:18 -070058 public static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
Madan Jampani65f24bb2016-03-15 15:16:18 -070059 new ResourceType(AtomixLeaderElector.class),
60 new ResourceType(AtomixConsistentMap.class));
61
Madan Jampani15b8ef52016-02-02 17:35:05 -080062 public StoragePartition(Partition partition,
63 MessagingService messagingService,
64 ClusterService clusterService,
65 Serializer serializer,
66 File logFolder) {
Madan Jampani33547452016-02-29 16:45:04 -080067 this.partition = partition;
Madan Jampani15b8ef52016-02-02 17:35:05 -080068 this.messagingService = messagingService;
69 this.clusterService = clusterService;
70 this.localNodeId = clusterService.getLocalNode().id();
71 this.serializer = serializer;
72 this.logFolder = logFolder;
73 }
74
Madan Jampani3a9911c2016-02-21 11:25:45 -080075 /**
76 * Returns the partition client instance.
77 * @return client
78 */
Madan Jampani15b8ef52016-02-02 17:35:05 -080079 public StoragePartitionClient client() {
80 return client;
81 }
82
83 @Override
84 public CompletableFuture<Void> open() {
Madan Jampanif172d402016-03-04 00:56:38 -080085 if (partition.getMembers().contains(localNodeId)) {
86 openServer();
87 }
Madan Jampanic94b4852016-02-23 18:18:37 -080088 return openClient().thenAccept(v -> isOpened.set(true))
Madan Jampani15b8ef52016-02-02 17:35:05 -080089 .thenApply(v -> null);
90 }
91
92 @Override
93 public CompletableFuture<Void> close() {
Madan Jampani33547452016-02-29 16:45:04 -080094 // We do not explicitly close the server and instead let the cluster
95 // deal with this as an unclean exit.
Madan Jampani65f24bb2016-03-15 15:16:18 -070096 return closeClient();
Madan Jampani15b8ef52016-02-02 17:35:05 -080097 }
98
Madan Jampani33547452016-02-29 16:45:04 -080099 /**
100 * Returns the identifier of the {@link Partition partition} associated with this instance.
101 * @return partition identifier
102 */
103 public PartitionId getId() {
104 return partition.getId();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800105 }
106
Madan Jampani33547452016-02-29 16:45:04 -0800107 /**
108 * Returns the identifiers of partition members.
109 * @return partition member instance ids
110 */
111 public Collection<NodeId> getMembers() {
112 return partition.getMembers();
113 }
114
115 /**
116 * Returns the {@link Address addresses} of partition members.
117 * @return partition member addresses
118 */
119 public Collection<Address> getMemberAddresses() {
120 return Collections2.transform(partition.getMembers(), this::toAddress);
121 }
122
Madan Jampanif172d402016-03-04 00:56:38 -0800123 /**
124 * Attempts to rejoin the partition.
125 * @return future that is completed after the operation is complete
126 */
Madan Jampani33547452016-02-29 16:45:04 -0800127 private CompletableFuture<Void> openServer() {
128 if (!partition.getMembers().contains(localNodeId) || server != null) {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800129 return CompletableFuture.completedFuture(null);
130 }
131 StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
132 this,
133 serializer,
134 () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
Madan Jampani33547452016-02-29 16:45:04 -0800135 partition.getId(),
Madan Jampani15b8ef52016-02-02 17:35:05 -0800136 messagingService),
Madan Jampani15b8ef52016-02-02 17:35:05 -0800137 logFolder);
Madan Jampani33547452016-02-29 16:45:04 -0800138 return server.open().thenRun(() -> this.server = server);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800139 }
140
Madan Jampanif172d402016-03-04 00:56:38 -0800141 /**
142 * Attempts to join the partition as a new member.
143 * @return future that is completed after the operation is complete
144 */
145 private CompletableFuture<Void> joinCluster() {
146 Set<NodeId> otherMembers = partition.getMembers()
147 .stream()
148 .filter(nodeId -> !nodeId.equals(localNodeId))
149 .collect(Collectors.toSet());
150 StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
151 this,
152 serializer,
153 () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
154 partition.getId(),
155 messagingService),
Madan Jampanif172d402016-03-04 00:56:38 -0800156 logFolder);
157 return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
158 }
159
Madan Jampani15b8ef52016-02-02 17:35:05 -0800160 private CompletableFuture<StoragePartitionClient> openClient() {
161 client = new StoragePartitionClient(this,
162 serializer,
163 new CopycatTransport(CopycatTransport.Mode.CLIENT,
Madan Jampani33547452016-02-29 16:45:04 -0800164 partition.getId(),
Madan Jampani65f24bb2016-03-15 15:16:18 -0700165 messagingService));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800166 return client.open().thenApply(v -> client);
167 }
168
Madan Jampani33547452016-02-29 16:45:04 -0800169 /**
170 * Closes the partition server if it was previously opened.
171 * @return future that is completed when the operation completes
172 */
Madan Jampanif172d402016-03-04 00:56:38 -0800173 public CompletableFuture<Void> leaveCluster() {
Madan Jampani33547452016-02-29 16:45:04 -0800174 return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
175 }
176
177 @Override
178 public boolean isOpen() {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700179 return isOpened.get();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800180 }
181
182 private CompletableFuture<Void> closeClient() {
183 if (client != null) {
184 return client.close();
185 }
186 return CompletableFuture.completedFuture(null);
187 }
188
189 private Address toAddress(NodeId nodeId) {
190 ControllerNode node = clusterService.getNode(nodeId);
191 return new Address(node.ip().toString(), node.tcpPort());
192 }
193
Madan Jampanie14a09c2016-02-11 10:43:21 -0800194 /**
195 * Returns the partition information if this partition is locally managed i.e.
196 * this node is a active member of the partition.
197 * @return partition info
198 */
199 public Optional<PartitionInfo> info() {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700200 return server != null && server.isOpen() ? Optional.of(server.info()) : Optional.empty();
Madan Jampani33547452016-02-29 16:45:04 -0800201 }
202
Jon Hall1195afb2016-06-28 18:54:07 -0700203 /**
204 * Process updates to partitions and handles joining or leaving a partition.
205 * @param newValue new Partition
206 */
Madan Jampanif172d402016-03-04 00:56:38 -0800207 public void onUpdate(Partition newValue) {
Jon Hall1195afb2016-06-28 18:54:07 -0700208
209 boolean wasPresent = partition.getMembers().contains(localNodeId);
210 boolean isPresent = newValue.getMembers().contains(localNodeId);
Madan Jampanif172d402016-03-04 00:56:38 -0800211 this.partition = newValue;
Jon Hall1195afb2016-06-28 18:54:07 -0700212 if ((wasPresent && isPresent) || (!wasPresent && !isPresent)) {
213 // no action needed
214 return;
215 }
216 //only need to do action if our membership changed
217 if (wasPresent) {
Madan Jampanif172d402016-03-04 00:56:38 -0800218 leaveCluster();
Jon Hall1195afb2016-06-28 18:54:07 -0700219 } else if (isPresent) {
220 joinCluster();
Madan Jampani33547452016-02-29 16:45:04 -0800221 }
Madan Jampanie14a09c2016-02-11 10:43:21 -0800222 }
Madan Jampani15b8ef52016-02-02 17:35:05 -0800223}