blob: dd49e23d85e37bf3de421bb48bf919566a916ad2 [file] [log] [blame]
Madan Jampani15b8ef52016-02-02 17:35:05 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import io.atomix.catalyst.serializer.Serializer;
19import io.atomix.catalyst.transport.Address;
20import io.atomix.resource.ResourceType;
21import io.atomix.variables.DistributedLong;
22
23import java.io.File;
24import java.util.Collection;
25import java.util.Optional;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.atomic.AtomicBoolean;
28
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.ControllerNode;
31import org.onosproject.cluster.DefaultPartition;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.cluster.Partition;
34import org.onosproject.store.cluster.messaging.MessagingService;
35import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
36
37import com.google.common.collect.Collections2;
38import com.google.common.collect.ImmutableSet;
39
40/**
41 * Storage partition.
42 */
43public class StoragePartition extends DefaultPartition implements Managed<StoragePartition> {
44
45 private final AtomicBoolean isOpened = new AtomicBoolean(false);
46 private final AtomicBoolean isClosed = new AtomicBoolean(false);
47 private final Serializer serializer;
48 private final MessagingService messagingService;
49 private final ClusterService clusterService;
50 private final File logFolder;
51 private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
52 new ResourceType(DistributedLong.class),
53 new ResourceType(AtomixConsistentMap.class));
54
55 private NodeId localNodeId;
56 private Optional<StoragePartitionServer> server = Optional.empty();
57 private StoragePartitionClient client;
58
59 public StoragePartition(Partition partition,
60 MessagingService messagingService,
61 ClusterService clusterService,
62 Serializer serializer,
63 File logFolder) {
64 super(partition);
65 this.messagingService = messagingService;
66 this.clusterService = clusterService;
67 this.localNodeId = clusterService.getLocalNode().id();
68 this.serializer = serializer;
69 this.logFolder = logFolder;
70 }
71
72 public StoragePartitionClient client() {
73 return client;
74 }
75
76 @Override
77 public CompletableFuture<Void> open() {
78 return openServer().thenAccept(s -> server = Optional.of(s))
79 .thenCompose(v-> openClient())
80 .thenAccept(v -> isOpened.set(true))
81 .thenApply(v -> null);
82 }
83
84 @Override
85 public CompletableFuture<Void> close() {
86 return closeClient().thenCompose(v -> closeServer())
87 .thenAccept(v -> isClosed.set(true))
88 .thenApply(v -> null);
89 }
90
91 public Collection<Address> getMemberAddresses() {
92 return Collections2.transform(getMembers(), this::toAddress);
93 }
94
95 private CompletableFuture<StoragePartitionServer> openServer() {
96 if (!getMembers().contains(localNodeId)) {
97 return CompletableFuture.completedFuture(null);
98 }
99 StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
100 this,
101 serializer,
102 () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
103 getId(),
104 messagingService),
105 RESOURCE_TYPES,
106 logFolder);
107 return server.open().thenApply(v -> server);
108 }
109
110 private CompletableFuture<StoragePartitionClient> openClient() {
111 client = new StoragePartitionClient(this,
112 serializer,
113 new CopycatTransport(CopycatTransport.Mode.CLIENT,
114 getId(),
115 messagingService),
116 RESOURCE_TYPES);
117 return client.open().thenApply(v -> client);
118 }
119
120 private CompletableFuture<Void> closeServer() {
121 if (server.isPresent()) {
122 return server.get().close();
123 } else {
124 return CompletableFuture.completedFuture(null);
125 }
126 }
127
128 private CompletableFuture<Void> closeClient() {
129 if (client != null) {
130 return client.close();
131 }
132 return CompletableFuture.completedFuture(null);
133 }
134
135 private Address toAddress(NodeId nodeId) {
136 ControllerNode node = clusterService.getNode(nodeId);
137 return new Address(node.ip().toString(), node.tcpPort());
138 }
139
140 @Override
141 public boolean isOpen() {
142 return !isClosed.get() && isOpened.get();
143 }
144
145 @Override
146 public boolean isClosed() {
147 return isOpened.get() && isClosed.get();
148 }
149}