blob: bbbd9b61f9ed7a545357f36c725602d6e8e613b6 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.CopyOnWriteArrayList;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicInteger;
Madan Jampanif1b8e172015-03-23 11:42:02 -070028import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053029
Madan Jampanibff6d8f2015-03-31 16:53:47 -070030import org.onosproject.store.service.DatabaseUpdate;
31import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053032import org.onosproject.store.service.Versioned;
33
Madan Jampani94c23532015-02-05 17:40:01 -080034import com.google.common.collect.Lists;
35import com.google.common.collect.Maps;
36import com.google.common.collect.Sets;
37
Madan Jampanif1b8e172015-03-23 11:42:02 -070038import net.kuujo.copycat.Task;
39import net.kuujo.copycat.cluster.Cluster;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080040import static com.google.common.base.Preconditions.checkState;
41
Madan Jampani94c23532015-02-05 17:40:01 -080042/**
43 * A database that partitions the keys across one or more database partitions.
44 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070045public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080046
Madan Jampanif1b8e172015-03-23 11:42:02 -070047 private final String name;
48 private final Partitioner<String> partitioner;
49 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080050 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070051 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani94c23532015-02-05 17:40:01 -080052
Madan Jampanif1b8e172015-03-23 11:42:02 -070053 public PartitionedDatabase(
54 String name,
55 Collection<Database> partitions) {
56 this.name = name;
57 this.partitions = partitions
58 .stream()
59 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
60 .collect(Collectors.toList());
61 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
62 }
63
64 /**
65 * Returns the databases for individual partitions.
66 * @return list of database partitions
67 */
68 public List<Database> getPartitions() {
69 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080070 }
71
Madan Jampani7f72c3f2015-03-01 17:34:59 -080072 /**
73 * Returns true if the database is open.
74 * @return true if open, false otherwise
75 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070076 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080077 public boolean isOpen() {
78 return isOpen.get();
79 }
80
Madan Jampani94c23532015-02-05 17:40:01 -080081 @Override
Madan Jampania89f8f92015-04-01 14:39:54 -070082 public CompletableFuture<Set<String>> tableNames() {
83 checkState(isOpen.get(), DB_NOT_OPEN);
84 Set<String> tableNames = Sets.newConcurrentHashSet();
85 return CompletableFuture.allOf(partitions
86 .stream()
87 .map(db -> db.tableNames().thenApply(tableNames::addAll))
88 .toArray(CompletableFuture[]::new))
89 .thenApply(v -> tableNames);
90 }
91
92 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070093 public CompletableFuture<Map<String, Long>> counters() {
94 checkState(isOpen.get(), DB_NOT_OPEN);
95 Map<String, Long> counters = Maps.newConcurrentMap();
96 return CompletableFuture.allOf(partitions
97 .stream()
98 .map(db -> db.counters()
99 .thenApply(m -> {
100 counters.putAll(m);
101 return null;
102 }))
103 .toArray(CompletableFuture[]::new))
104 .thenApply(v -> counters);
105 }
106
107 @Override
Madan Jampani94c23532015-02-05 17:40:01 -0800108 public CompletableFuture<Integer> size(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800109 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800110 AtomicInteger totalSize = new AtomicInteger(0);
111 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800112 .stream()
113 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
114 .toArray(CompletableFuture[]::new))
115 .thenApply(v -> totalSize.get());
116 }
117
118 @Override
119 public CompletableFuture<Boolean> isEmpty(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800120 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800121 return size(tableName).thenApply(size -> size == 0);
122 }
123
124 @Override
125 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800126 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800127 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
128 }
129
130 @Override
131 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800132 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800133 AtomicBoolean containsValue = new AtomicBoolean(false);
134 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800135 .stream()
136 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
137 .toArray(CompletableFuture[]::new))
138 .thenApply(v -> containsValue.get());
139 }
140
141 @Override
142 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800143 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800144 return partitioner.getPartition(tableName, key).get(tableName, key);
145 }
146
147 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700148 public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800149 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800150 return partitioner.getPartition(tableName, key).put(tableName, key, value);
151 }
152
153 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700154 public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800155 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800156 return partitioner.getPartition(tableName, key).remove(tableName, key);
157 }
158
159 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700160 public CompletableFuture<Result<Void>> clear(String tableName) {
161 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800162 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800163 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800164 .stream()
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700165 .map(p -> p.clear(tableName)
166 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
167 .toArray(CompletableFuture[]::new))
168 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800169 }
170
171 @Override
172 public CompletableFuture<Set<String>> keySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800173 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800174 Set<String> keySet = Sets.newConcurrentHashSet();
175 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800176 .stream()
177 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
178 .toArray(CompletableFuture[]::new))
179 .thenApply(v -> keySet);
180 }
181
182 @Override
183 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800184 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800185 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
186 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800187 .stream()
Madan Jampani393e0f02015-02-12 07:35:39 +0530188 .map(p -> p.values(tableName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800189 .toArray(CompletableFuture[]::new))
190 .thenApply(v -> values);
191 }
192
193 @Override
194 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800195 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800196 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
197 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800198 .stream()
199 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
200 .toArray(CompletableFuture[]::new))
201 .thenApply(v -> entrySet);
202 }
203
204 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700205 public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800206 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800207 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
208 }
209
210 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700211 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800212 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800213 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
214 }
215
216 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700217 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800218 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800219 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
220 }
221
222 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700223 public CompletableFuture<Result<Boolean>> replace(
224 String tableName, String key, byte[] oldValue, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800225 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800226 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
227 }
228
229 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700230 public CompletableFuture<Result<Boolean>> replace(
231 String tableName, String key, long oldVersion, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800232 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800233 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
234 }
235
236 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -0700237 public CompletableFuture<Long> nextValue(String counterName) {
238 checkState(isOpen.get(), DB_NOT_OPEN);
239 return partitioner.getPartition(counterName, counterName).nextValue(counterName);
240 }
241
242 @Override
243 public CompletableFuture<Long> currentValue(String counterName) {
244 checkState(isOpen.get(), DB_NOT_OPEN);
245 return partitioner.getPartition(counterName, counterName).currentValue(counterName);
246 }
247
248 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700249 public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
250 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
251 if (subTransactions.isEmpty()) {
252 return CompletableFuture.completedFuture(true);
253 } else if (subTransactions.size() == 1) {
254 Entry<Database, Transaction> entry =
255 subTransactions.entrySet().iterator().next();
256 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800257 } else {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700258 return new TransactionManager(this).execute(transaction);
Madan Jampani94c23532015-02-05 17:40:01 -0800259 }
260 }
261
262 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700263 public CompletableFuture<Boolean> prepare(Transaction transaction) {
264 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
265 AtomicBoolean status = new AtomicBoolean(true);
266 return CompletableFuture.allOf(subTransactions.entrySet()
267 .stream()
268 .map(entry -> entry
269 .getKey()
270 .prepare(entry.getValue())
271 .thenApply(v -> status.compareAndSet(true, v)))
272 .toArray(CompletableFuture[]::new))
273 .thenApply(v -> status.get());
274 }
275
276 @Override
277 public CompletableFuture<Boolean> commit(Transaction transaction) {
278 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
279 return CompletableFuture.allOf(subTransactions.entrySet()
280 .stream()
281 .map(entry -> entry.getKey().commit(entry.getValue()))
282 .toArray(CompletableFuture[]::new))
283 .thenApply(v -> true);
284 }
285
286 @Override
287 public CompletableFuture<Boolean> rollback(Transaction transaction) {
288 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
289 return CompletableFuture.allOf(subTransactions.entrySet()
290 .stream()
291 .map(entry -> entry.getKey().rollback(entry.getValue()))
292 .toArray(CompletableFuture[]::new))
293 .thenApply(v -> true);
294 }
295
296 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700297 public CompletableFuture<Database> open() {
298 return CompletableFuture.allOf(partitions
299 .stream()
300 .map(Database::open)
301 .toArray(CompletableFuture[]::new))
302 .thenApply(v -> {
303 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700304 return this;
305 });
Madan Jampani94c23532015-02-05 17:40:01 -0800306 }
307
308 @Override
309 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800310 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700311 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800312 .stream()
313 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800314 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800315 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700316
317 @Override
318 public boolean isClosed() {
319 return !isOpen.get();
320 }
321
322 @Override
323 public String name() {
324 return name;
325 }
326
327 @Override
328 public Cluster cluster() {
329 throw new UnsupportedOperationException();
330 }
331
332 @Override
333 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
334 throw new UnsupportedOperationException();
335 }
336
337 @Override
338 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
339 throw new UnsupportedOperationException();
340 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700341
342 private Map<Database, Transaction> createSubTransactions(
343 Transaction transaction) {
344 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
345 for (DatabaseUpdate update : transaction.updates()) {
346 Database partition = partitioner.getPartition(update.tableName(), update.key());
347 List<DatabaseUpdate> partitionUpdates =
348 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
349 partitionUpdates.add(update);
350 }
351 Map<Database, Transaction> subTransactions = Maps.newHashMap();
352 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
353
354 return subTransactions;
355 }
356}