blob: 3b142a7ef939a0c31a8787b614f481e4b7fd690f [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import io.atomix.catalyst.serializer.Serializer;
19import io.atomix.catalyst.transport.Address;
20import io.atomix.resource.ResourceType;
21import io.atomix.variables.DistributedLong;
22
23import java.io.File;
24import java.util.Collection;
25import java.util.Optional;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.atomic.AtomicBoolean;
28
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.ControllerNode;
31import org.onosproject.cluster.DefaultPartition;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.cluster.Partition;
34import org.onosproject.store.cluster.messaging.MessagingService;
35import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
Madan Jampani39fff102016-02-14 13:17:28 -080036import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
Madan Jampanie14a09c2016-02-11 10:43:21 -080037import org.onosproject.store.service.PartitionInfo;
Madan Jampani15b8ef52016-02-02 17:35:05 -080038
39import com.google.common.collect.Collections2;
40import com.google.common.collect.ImmutableSet;
41
42/**
43 * Storage partition.
44 */
45public class StoragePartition extends DefaultPartition implements Managed<StoragePartition> {
46
47 private final AtomicBoolean isOpened = new AtomicBoolean(false);
48 private final AtomicBoolean isClosed = new AtomicBoolean(false);
49 private final Serializer serializer;
50 private final MessagingService messagingService;
51 private final ClusterService clusterService;
52 private final File logFolder;
53 private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
54 new ResourceType(DistributedLong.class),
Madan Jampani39fff102016-02-14 13:17:28 -080055 new ResourceType(AtomixLeaderElector.class),
Madan Jampani15b8ef52016-02-02 17:35:05 -080056 new ResourceType(AtomixConsistentMap.class));
57
58 private NodeId localNodeId;
59 private Optional<StoragePartitionServer> server = Optional.empty();
60 private StoragePartitionClient client;
61
62 public StoragePartition(Partition partition,
63 MessagingService messagingService,
64 ClusterService clusterService,
65 Serializer serializer,
66 File logFolder) {
67 super(partition);
68 this.messagingService = messagingService;
69 this.clusterService = clusterService;
70 this.localNodeId = clusterService.getLocalNode().id();
71 this.serializer = serializer;
72 this.logFolder = logFolder;
73 }
74
Madan Jampani3a9911c2016-02-21 11:25:45 -080075 /**
76 * Returns the partition client instance.
77 * @return client
78 */
Madan Jampani15b8ef52016-02-02 17:35:05 -080079 public StoragePartitionClient client() {
80 return client;
81 }
82
Madan Jampani3a9911c2016-02-21 11:25:45 -080083 /**
84 * Returns the optional server instance.
85 * @return server
86 */
87 public Optional<StoragePartitionServer> server() {
88 return server;
89 }
90
Madan Jampani15b8ef52016-02-02 17:35:05 -080091 @Override
92 public CompletableFuture<Void> open() {
Madan Jampani1c9b4e92016-02-17 18:33:11 -080093 return openServer().thenAccept(s -> server = Optional.ofNullable(s))
Madan Jampani15b8ef52016-02-02 17:35:05 -080094 .thenCompose(v-> openClient())
95 .thenAccept(v -> isOpened.set(true))
96 .thenApply(v -> null);
97 }
98
99 @Override
100 public CompletableFuture<Void> close() {
101 return closeClient().thenCompose(v -> closeServer())
102 .thenAccept(v -> isClosed.set(true))
103 .thenApply(v -> null);
104 }
105
106 public Collection<Address> getMemberAddresses() {
107 return Collections2.transform(getMembers(), this::toAddress);
108 }
109
110 private CompletableFuture<StoragePartitionServer> openServer() {
111 if (!getMembers().contains(localNodeId)) {
112 return CompletableFuture.completedFuture(null);
113 }
114 StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
115 this,
116 serializer,
117 () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
118 getId(),
119 messagingService),
120 RESOURCE_TYPES,
121 logFolder);
122 return server.open().thenApply(v -> server);
123 }
124
125 private CompletableFuture<StoragePartitionClient> openClient() {
126 client = new StoragePartitionClient(this,
127 serializer,
128 new CopycatTransport(CopycatTransport.Mode.CLIENT,
129 getId(),
130 messagingService),
131 RESOURCE_TYPES);
132 return client.open().thenApply(v -> client);
133 }
134
135 private CompletableFuture<Void> closeServer() {
Sho SHIMIZUef7e2902016-02-12 18:38:29 -0800136 return server.map(StoragePartitionServer::close)
137 .orElse(CompletableFuture.completedFuture(null));
Madan Jampani15b8ef52016-02-02 17:35:05 -0800138 }
139
140 private CompletableFuture<Void> closeClient() {
141 if (client != null) {
142 return client.close();
143 }
144 return CompletableFuture.completedFuture(null);
145 }
146
147 private Address toAddress(NodeId nodeId) {
148 ControllerNode node = clusterService.getNode(nodeId);
149 return new Address(node.ip().toString(), node.tcpPort());
150 }
151
152 @Override
153 public boolean isOpen() {
154 return !isClosed.get() && isOpened.get();
155 }
156
157 @Override
158 public boolean isClosed() {
159 return isOpened.get() && isClosed.get();
160 }
Madan Jampanie14a09c2016-02-11 10:43:21 -0800161
162 /**
163 * Returns the partition information if this partition is locally managed i.e.
164 * this node is a active member of the partition.
165 * @return partition info
166 */
167 public Optional<PartitionInfo> info() {
168 return server.map(StoragePartitionServer::info);
169 }
Madan Jampani15b8ef52016-02-02 17:35:05 -0800170}