blob: 7b279dcdbb2a2299f7ef61abc10eddd962e42f35 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.CopyOnWriteArrayList;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicInteger;
Madan Jampanif1b8e172015-03-23 11:42:02 -070028import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053029
Madan Jampanibff6d8f2015-03-31 16:53:47 -070030import org.onosproject.store.service.DatabaseUpdate;
31import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053032import org.onosproject.store.service.Versioned;
33
Madan Jampani94c23532015-02-05 17:40:01 -080034import com.google.common.collect.Lists;
35import com.google.common.collect.Maps;
36import com.google.common.collect.Sets;
37
Madan Jampanif1b8e172015-03-23 11:42:02 -070038import net.kuujo.copycat.Task;
39import net.kuujo.copycat.cluster.Cluster;
Madan Jampanied1b7fc2015-04-27 23:30:07 -070040import net.kuujo.copycat.resource.ResourceState;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080041import static com.google.common.base.Preconditions.checkState;
42
Madan Jampani94c23532015-02-05 17:40:01 -080043/**
44 * A database that partitions the keys across one or more database partitions.
45 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070046public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080047
Madan Jampanif1b8e172015-03-23 11:42:02 -070048 private final String name;
49 private final Partitioner<String> partitioner;
50 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080051 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070052 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani50589ac2015-06-08 11:38:46 -070053 private TransactionManager transactionManager;
Madan Jampani94c23532015-02-05 17:40:01 -080054
Madan Jampanif1b8e172015-03-23 11:42:02 -070055 public PartitionedDatabase(
56 String name,
57 Collection<Database> partitions) {
58 this.name = name;
59 this.partitions = partitions
60 .stream()
61 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
62 .collect(Collectors.toList());
63 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
64 }
65
66 /**
67 * Returns the databases for individual partitions.
68 * @return list of database partitions
69 */
70 public List<Database> getPartitions() {
71 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080072 }
73
Madan Jampani7f72c3f2015-03-01 17:34:59 -080074 /**
75 * Returns true if the database is open.
76 * @return true if open, false otherwise
77 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070078 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080079 public boolean isOpen() {
80 return isOpen.get();
81 }
82
Madan Jampani94c23532015-02-05 17:40:01 -080083 @Override
Madan Jampania89f8f92015-04-01 14:39:54 -070084 public CompletableFuture<Set<String>> tableNames() {
85 checkState(isOpen.get(), DB_NOT_OPEN);
86 Set<String> tableNames = Sets.newConcurrentHashSet();
87 return CompletableFuture.allOf(partitions
88 .stream()
89 .map(db -> db.tableNames().thenApply(tableNames::addAll))
90 .toArray(CompletableFuture[]::new))
91 .thenApply(v -> tableNames);
92 }
93
94 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070095 public CompletableFuture<Map<String, Long>> counters() {
96 checkState(isOpen.get(), DB_NOT_OPEN);
97 Map<String, Long> counters = Maps.newConcurrentMap();
98 return CompletableFuture.allOf(partitions
99 .stream()
100 .map(db -> db.counters()
101 .thenApply(m -> {
102 counters.putAll(m);
103 return null;
104 }))
105 .toArray(CompletableFuture[]::new))
106 .thenApply(v -> counters);
107 }
108
109 @Override
Madan Jampani94c23532015-02-05 17:40:01 -0800110 public CompletableFuture<Integer> size(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800111 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800112 AtomicInteger totalSize = new AtomicInteger(0);
113 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800114 .stream()
115 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
116 .toArray(CompletableFuture[]::new))
117 .thenApply(v -> totalSize.get());
118 }
119
120 @Override
121 public CompletableFuture<Boolean> isEmpty(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800122 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800123 return size(tableName).thenApply(size -> size == 0);
124 }
125
126 @Override
127 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800128 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800129 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
130 }
131
132 @Override
133 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800134 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800135 AtomicBoolean containsValue = new AtomicBoolean(false);
136 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800137 .stream()
138 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
139 .toArray(CompletableFuture[]::new))
140 .thenApply(v -> containsValue.get());
141 }
142
143 @Override
144 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800145 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800146 return partitioner.getPartition(tableName, key).get(tableName, key);
147 }
148
149 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700150 public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800151 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800152 return partitioner.getPartition(tableName, key).put(tableName, key, value);
153 }
154
155 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700156 public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
157 String key,
158 byte[] value) {
159 checkState(isOpen.get(), DB_NOT_OPEN);
160 return partitioner.getPartition(tableName, key).putAndGet(tableName, key, value);
161 }
162
163 @Override
164 public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
165 String key,
166 byte[] value) {
167 checkState(isOpen.get(), DB_NOT_OPEN);
168 return partitioner.getPartition(tableName, key).putIfAbsentAndGet(tableName, key, value);
169 }
170
171 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700172 public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800173 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800174 return partitioner.getPartition(tableName, key).remove(tableName, key);
175 }
176
177 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700178 public CompletableFuture<Result<Void>> clear(String tableName) {
179 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800180 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800181 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800182 .stream()
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700183 .map(p -> p.clear(tableName)
184 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
185 .toArray(CompletableFuture[]::new))
186 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800187 }
188
189 @Override
190 public CompletableFuture<Set<String>> keySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800191 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800192 Set<String> keySet = Sets.newConcurrentHashSet();
193 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800194 .stream()
195 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
196 .toArray(CompletableFuture[]::new))
197 .thenApply(v -> keySet);
198 }
199
200 @Override
201 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800202 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800203 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
204 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800205 .stream()
Madan Jampani393e0f02015-02-12 07:35:39 +0530206 .map(p -> p.values(tableName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800207 .toArray(CompletableFuture[]::new))
208 .thenApply(v -> values);
209 }
210
211 @Override
212 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800213 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800214 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
215 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800216 .stream()
217 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
218 .toArray(CompletableFuture[]::new))
219 .thenApply(v -> entrySet);
220 }
221
222 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700223 public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800224 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800225 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
226 }
227
228 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700229 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800230 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800231 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
232 }
233
234 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700235 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800236 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800237 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
238 }
239
240 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700241 public CompletableFuture<Result<Boolean>> replace(
242 String tableName, String key, byte[] oldValue, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800243 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800244 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
245 }
246
247 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700248 public CompletableFuture<Result<Boolean>> replace(
249 String tableName, String key, long oldVersion, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800250 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800251 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
252 }
253
254 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700255 public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(
256 String tableName, String key, long oldVersion, byte[] newValue) {
257 checkState(isOpen.get(), DB_NOT_OPEN);
258 return partitioner.getPartition(tableName, key).replaceAndGet(tableName, key, oldVersion, newValue);
259 }
260
261 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700262 public CompletableFuture<Long> counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700263 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700264 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700265 }
266
267 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700268 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700269 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700270 return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
271 }
272
273 @Override
274 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
275 checkState(isOpen.get(), DB_NOT_OPEN);
276 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700277 }
278
279 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700280 public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
281 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
282 if (subTransactions.isEmpty()) {
283 return CompletableFuture.completedFuture(true);
284 } else if (subTransactions.size() == 1) {
285 Entry<Database, Transaction> entry =
286 subTransactions.entrySet().iterator().next();
287 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800288 } else {
Madan Jampani50589ac2015-06-08 11:38:46 -0700289 if (transactionManager != null) {
290 throw new IllegalStateException("TransactionManager is not initialized");
291 }
292 return transactionManager.execute(transaction);
Madan Jampani94c23532015-02-05 17:40:01 -0800293 }
294 }
295
296 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700297 public CompletableFuture<Boolean> prepare(Transaction transaction) {
298 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
299 AtomicBoolean status = new AtomicBoolean(true);
300 return CompletableFuture.allOf(subTransactions.entrySet()
301 .stream()
302 .map(entry -> entry
303 .getKey()
304 .prepare(entry.getValue())
305 .thenApply(v -> status.compareAndSet(true, v)))
306 .toArray(CompletableFuture[]::new))
307 .thenApply(v -> status.get());
308 }
309
310 @Override
311 public CompletableFuture<Boolean> commit(Transaction transaction) {
312 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
313 return CompletableFuture.allOf(subTransactions.entrySet()
314 .stream()
315 .map(entry -> entry.getKey().commit(entry.getValue()))
316 .toArray(CompletableFuture[]::new))
317 .thenApply(v -> true);
318 }
319
320 @Override
321 public CompletableFuture<Boolean> rollback(Transaction transaction) {
322 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
323 return CompletableFuture.allOf(subTransactions.entrySet()
324 .stream()
325 .map(entry -> entry.getKey().rollback(entry.getValue()))
326 .toArray(CompletableFuture[]::new))
327 .thenApply(v -> true);
328 }
329
330 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700331 public CompletableFuture<Database> open() {
332 return CompletableFuture.allOf(partitions
333 .stream()
334 .map(Database::open)
335 .toArray(CompletableFuture[]::new))
336 .thenApply(v -> {
337 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700338 return this;
339 });
Madan Jampani94c23532015-02-05 17:40:01 -0800340 }
341
342 @Override
343 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800344 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700345 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800346 .stream()
347 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800348 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800349 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700350
351 @Override
352 public boolean isClosed() {
353 return !isOpen.get();
354 }
355
356 @Override
357 public String name() {
358 return name;
359 }
360
361 @Override
362 public Cluster cluster() {
363 throw new UnsupportedOperationException();
364 }
365
366 @Override
367 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
368 throw new UnsupportedOperationException();
369 }
370
371 @Override
372 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
373 throw new UnsupportedOperationException();
374 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700375
Madan Jampanied1b7fc2015-04-27 23:30:07 -0700376 @Override
377 public ResourceState state() {
378 throw new UnsupportedOperationException();
379 }
380
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700381 private Map<Database, Transaction> createSubTransactions(
382 Transaction transaction) {
383 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
384 for (DatabaseUpdate update : transaction.updates()) {
385 Database partition = partitioner.getPartition(update.tableName(), update.key());
386 List<DatabaseUpdate> partitionUpdates =
387 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
388 partitionUpdates.add(update);
389 }
390 Map<Database, Transaction> subTransactions = Maps.newHashMap();
391 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700392 return subTransactions;
393 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700394
395 protected void setTransactionManager(TransactionManager tranasactionManager) {
396 this.transactionManager = transactionManager;
397 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700398}