blob: b40e39db8b7ec16629023967771a185d349ba8a5 [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import io.atomix.catalyst.serializer.Serializer;
19import io.atomix.catalyst.transport.Address;
20import io.atomix.resource.ResourceType;
21import io.atomix.variables.DistributedLong;
22
23import java.io.File;
24import java.util.Collection;
25import java.util.Optional;
Madan Jampanif172d402016-03-04 00:56:38 -080026import java.util.Set;
Madan Jampani15b8ef52016-02-02 17:35:05 -080027import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampanif172d402016-03-04 00:56:38 -080029import java.util.stream.Collectors;
Madan Jampani15b8ef52016-02-02 17:35:05 -080030
31import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.ControllerNode;
Madan Jampani15b8ef52016-02-02 17:35:05 -080033import org.onosproject.cluster.NodeId;
34import org.onosproject.cluster.Partition;
Madan Jampani33547452016-02-29 16:45:04 -080035import org.onosproject.cluster.PartitionId;
Madan Jampani15b8ef52016-02-02 17:35:05 -080036import org.onosproject.store.cluster.messaging.MessagingService;
37import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
Madan Jampani39fff102016-02-14 13:17:28 -080038import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
Madan Jampanie14a09c2016-02-11 10:43:21 -080039import org.onosproject.store.service.PartitionInfo;
Madan Jampani15b8ef52016-02-02 17:35:05 -080040
41import com.google.common.collect.Collections2;
42import com.google.common.collect.ImmutableSet;
43
44/**
45 * Storage partition.
46 */
Madan Jampani33547452016-02-29 16:45:04 -080047public class StoragePartition implements Managed<StoragePartition> {
Madan Jampani15b8ef52016-02-02 17:35:05 -080048
49 private final AtomicBoolean isOpened = new AtomicBoolean(false);
Madan Jampani15b8ef52016-02-02 17:35:05 -080050 private final Serializer serializer;
51 private final MessagingService messagingService;
52 private final ClusterService clusterService;
53 private final File logFolder;
Madan Jampani33547452016-02-29 16:45:04 -080054 private Partition partition;
Madan Jampani15b8ef52016-02-02 17:35:05 -080055 private NodeId localNodeId;
Madan Jampani33547452016-02-29 16:45:04 -080056 private StoragePartitionServer server;
Madan Jampani15b8ef52016-02-02 17:35:05 -080057 private StoragePartitionClient client;
58
Madan Jampani65f24bb2016-03-15 15:16:18 -070059 public static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
60 new ResourceType(DistributedLong.class),
61 new ResourceType(AtomixLeaderElector.class),
62 new ResourceType(AtomixConsistentMap.class));
63
Madan Jampani15b8ef52016-02-02 17:35:05 -080064 public StoragePartition(Partition partition,
65 MessagingService messagingService,
66 ClusterService clusterService,
67 Serializer serializer,
68 File logFolder) {
Madan Jampani33547452016-02-29 16:45:04 -080069 this.partition = partition;
Madan Jampani15b8ef52016-02-02 17:35:05 -080070 this.messagingService = messagingService;
71 this.clusterService = clusterService;
72 this.localNodeId = clusterService.getLocalNode().id();
73 this.serializer = serializer;
74 this.logFolder = logFolder;
75 }
76
Madan Jampani3a9911c2016-02-21 11:25:45 -080077 /**
78 * Returns the partition client instance.
79 * @return client
80 */
Madan Jampani15b8ef52016-02-02 17:35:05 -080081 public StoragePartitionClient client() {
82 return client;
83 }
84
85 @Override
86 public CompletableFuture<Void> open() {
Madan Jampanif172d402016-03-04 00:56:38 -080087 if (partition.getMembers().contains(localNodeId)) {
88 openServer();
89 }
Madan Jampanic94b4852016-02-23 18:18:37 -080090 return openClient().thenAccept(v -> isOpened.set(true))
Madan Jampani15b8ef52016-02-02 17:35:05 -080091 .thenApply(v -> null);
92 }
93
94 @Override
95 public CompletableFuture<Void> close() {
Madan Jampani33547452016-02-29 16:45:04 -080096 // We do not explicitly close the server and instead let the cluster
97 // deal with this as an unclean exit.
Madan Jampani65f24bb2016-03-15 15:16:18 -070098 return closeClient();
Madan Jampani15b8ef52016-02-02 17:35:05 -080099 }
100
Madan Jampani33547452016-02-29 16:45:04 -0800101 /**
102 * Returns the identifier of the {@link Partition partition} associated with this instance.
103 * @return partition identifier
104 */
105 public PartitionId getId() {
106 return partition.getId();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800107 }
108
Madan Jampani33547452016-02-29 16:45:04 -0800109 /**
110 * Returns the identifiers of partition members.
111 * @return partition member instance ids
112 */
113 public Collection<NodeId> getMembers() {
114 return partition.getMembers();
115 }
116
117 /**
118 * Returns the {@link Address addresses} of partition members.
119 * @return partition member addresses
120 */
121 public Collection<Address> getMemberAddresses() {
122 return Collections2.transform(partition.getMembers(), this::toAddress);
123 }
124
Madan Jampanif172d402016-03-04 00:56:38 -0800125 /**
126 * Attempts to rejoin the partition.
127 * @return future that is completed after the operation is complete
128 */
Madan Jampani33547452016-02-29 16:45:04 -0800129 private CompletableFuture<Void> openServer() {
130 if (!partition.getMembers().contains(localNodeId) || server != null) {
Madan Jampani15b8ef52016-02-02 17:35:05 -0800131 return CompletableFuture.completedFuture(null);
132 }
133 StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
134 this,
135 serializer,
136 () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
Madan Jampani33547452016-02-29 16:45:04 -0800137 partition.getId(),
Madan Jampani15b8ef52016-02-02 17:35:05 -0800138 messagingService),
Madan Jampani15b8ef52016-02-02 17:35:05 -0800139 logFolder);
Madan Jampani33547452016-02-29 16:45:04 -0800140 return server.open().thenRun(() -> this.server = server);
Madan Jampani15b8ef52016-02-02 17:35:05 -0800141 }
142
Madan Jampanif172d402016-03-04 00:56:38 -0800143 /**
144 * Attempts to join the partition as a new member.
145 * @return future that is completed after the operation is complete
146 */
147 private CompletableFuture<Void> joinCluster() {
148 Set<NodeId> otherMembers = partition.getMembers()
149 .stream()
150 .filter(nodeId -> !nodeId.equals(localNodeId))
151 .collect(Collectors.toSet());
152 StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
153 this,
154 serializer,
155 () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
156 partition.getId(),
157 messagingService),
Madan Jampanif172d402016-03-04 00:56:38 -0800158 logFolder);
159 return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
160 }
161
Madan Jampani15b8ef52016-02-02 17:35:05 -0800162 private CompletableFuture<StoragePartitionClient> openClient() {
163 client = new StoragePartitionClient(this,
164 serializer,
165 new CopycatTransport(CopycatTransport.Mode.CLIENT,
Madan Jampani33547452016-02-29 16:45:04 -0800166 partition.getId(),
Madan Jampani65f24bb2016-03-15 15:16:18 -0700167 messagingService));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800168 return client.open().thenApply(v -> client);
169 }
170
Madan Jampani33547452016-02-29 16:45:04 -0800171 /**
172 * Closes the partition server if it was previously opened.
173 * @return future that is completed when the operation completes
174 */
Madan Jampanif172d402016-03-04 00:56:38 -0800175 public CompletableFuture<Void> leaveCluster() {
Madan Jampani33547452016-02-29 16:45:04 -0800176 return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
177 }
178
179 @Override
180 public boolean isOpen() {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700181 return isOpened.get();
Madan Jampani15b8ef52016-02-02 17:35:05 -0800182 }
183
184 private CompletableFuture<Void> closeClient() {
185 if (client != null) {
186 return client.close();
187 }
188 return CompletableFuture.completedFuture(null);
189 }
190
191 private Address toAddress(NodeId nodeId) {
192 ControllerNode node = clusterService.getNode(nodeId);
193 return new Address(node.ip().toString(), node.tcpPort());
194 }
195
Madan Jampanie14a09c2016-02-11 10:43:21 -0800196 /**
197 * Returns the partition information if this partition is locally managed i.e.
198 * this node is a active member of the partition.
199 * @return partition info
200 */
201 public Optional<PartitionInfo> info() {
Madan Jampani65f24bb2016-03-15 15:16:18 -0700202 return server != null && server.isOpen() ? Optional.of(server.info()) : Optional.empty();
Madan Jampani33547452016-02-29 16:45:04 -0800203 }
204
Madan Jampanif172d402016-03-04 00:56:38 -0800205 public void onUpdate(Partition newValue) {
206 if (partition.getMembers().contains(localNodeId) && newValue.getMembers().contains(localNodeId)) {
207 return;
208 }
209 if (!partition.getMembers().contains(localNodeId) && !newValue.getMembers().contains(localNodeId)) {
210 return;
211 }
212 this.partition = newValue;
Madan Jampani33547452016-02-29 16:45:04 -0800213 if (partition.getMembers().contains(localNodeId)) {
Madan Jampanif172d402016-03-04 00:56:38 -0800214 joinCluster();
Madan Jampani33547452016-02-29 16:45:04 -0800215 } else if (!partition.getMembers().contains(localNodeId)) {
Madan Jampanif172d402016-03-04 00:56:38 -0800216 leaveCluster();
Madan Jampani33547452016-02-29 16:45:04 -0800217 }
Madan Jampanie14a09c2016-02-11 10:43:21 -0800218 }
Madan Jampani15b8ef52016-02-02 17:35:05 -0800219}