blob: 04e5e5cd3342e1f5f7211fa77a3938524c183a39 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.CopyOnWriteArrayList;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani393e0f02015-02-12 07:35:39 +053028
29import org.onosproject.store.service.UpdateOperation;
30import org.onosproject.store.service.Versioned;
31
Madan Jampani94c23532015-02-05 17:40:01 -080032import com.google.common.collect.ImmutableMap;
33import com.google.common.collect.Lists;
34import com.google.common.collect.Maps;
35import com.google.common.collect.Sets;
36
37import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
38
39/**
40 * A database that partitions the keys across one or more database partitions.
41 */
42public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
43
44 private Partitioner<String> partitioner;
45 private final ClusterCoordinator coordinator;
46 private final Map<String, Database> partitions = Maps.newConcurrentMap();
47
48 protected PartitionedDatabase(ClusterCoordinator coordinator) {
49 this.coordinator = coordinator;
50 }
51
52 @Override
53 public void registerPartition(String name, Database partition) {
54 partitions.put(name, partition);
55 }
56
57 @Override
58 public Map<String, Database> getRegisteredPartitions() {
59 return ImmutableMap.copyOf(partitions);
60 }
61
62 @Override
63 public CompletableFuture<Integer> size(String tableName) {
64 AtomicInteger totalSize = new AtomicInteger(0);
65 return CompletableFuture.allOf(partitions
66 .values()
67 .stream()
68 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
69 .toArray(CompletableFuture[]::new))
70 .thenApply(v -> totalSize.get());
71 }
72
73 @Override
74 public CompletableFuture<Boolean> isEmpty(String tableName) {
75 return size(tableName).thenApply(size -> size == 0);
76 }
77
78 @Override
79 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
80 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
81 }
82
83 @Override
84 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
85 AtomicBoolean containsValue = new AtomicBoolean(false);
86 return CompletableFuture.allOf(partitions
87 .values()
88 .stream()
89 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
90 .toArray(CompletableFuture[]::new))
91 .thenApply(v -> containsValue.get());
92 }
93
94 @Override
95 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
96 return partitioner.getPartition(tableName, key).get(tableName, key);
97 }
98
99 @Override
100 public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
101 return partitioner.getPartition(tableName, key).put(tableName, key, value);
102 }
103
104 @Override
105 public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
106 return partitioner.getPartition(tableName, key).remove(tableName, key);
107 }
108
109 @Override
110 public CompletableFuture<Void> clear(String tableName) {
111 return CompletableFuture.allOf(partitions
112 .values()
113 .stream()
114 .map(p -> p.clear(tableName))
115 .toArray(CompletableFuture[]::new));
116 }
117
118 @Override
119 public CompletableFuture<Set<String>> keySet(String tableName) {
120 Set<String> keySet = Sets.newConcurrentHashSet();
121 return CompletableFuture.allOf(partitions
122 .values()
123 .stream()
124 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
125 .toArray(CompletableFuture[]::new))
126 .thenApply(v -> keySet);
127 }
128
129 @Override
130 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
131 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
132 return CompletableFuture.allOf(partitions
133 .values()
134 .stream()
Madan Jampani393e0f02015-02-12 07:35:39 +0530135 .map(p -> p.values(tableName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800136 .toArray(CompletableFuture[]::new))
137 .thenApply(v -> values);
138 }
139
140 @Override
141 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
142 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
143 return CompletableFuture.allOf(partitions
144 .values()
145 .stream()
146 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
147 .toArray(CompletableFuture[]::new))
148 .thenApply(v -> entrySet);
149 }
150
151 @Override
152 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
153 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
154 }
155
156 @Override
157 public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
158 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
159 }
160
161 @Override
162 public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
163 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
164 }
165
166 @Override
167 public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
168 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
169 }
170
171 @Override
172 public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
173 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
174 }
175
176 @Override
177 public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
178 Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
179 for (UpdateOperation<String, byte[]> update : updates) {
180 Database partition = partitioner.getPartition(update.tableName(), update.key());
181 List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
182 if (partitionUpdates == null) {
183 partitionUpdates = Lists.newArrayList();
184 perPartitionUpdates.put(partition, partitionUpdates);
185 }
186 partitionUpdates.add(update);
187 }
188 if (perPartitionUpdates.size() > 1) {
189 // TODO
190 throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
191 } else {
192 Entry<Database, List<UpdateOperation<String, byte[]>>> only =
193 perPartitionUpdates.entrySet().iterator().next();
194 return only.getKey().atomicBatchUpdate(only.getValue());
195 }
196 }
197
198 @Override
199 public void setPartitioner(Partitioner<String> partitioner) {
200 this.partitioner = partitioner;
201 }
202
203 @Override
204 public CompletableFuture<PartitionedDatabase> open() {
205 return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
206 .values()
207 .stream()
208 .map(Database::open)
Madan Jampani09342702015-02-05 23:32:40 -0800209 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800210 .thenApply(v -> this));
211
212 }
213
214 @Override
215 public CompletableFuture<Void> close() {
216 CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
217 .values()
218 .stream()
219 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800220 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800221 CompletableFuture<Void> closeCoordinator = coordinator.close();
222 return closePartitions.thenCompose(v -> closeCoordinator);
223 }
224}