blob: c19bccd8c8751d681701740135e59341380041ba [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.CopyOnWriteArrayList;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicInteger;
Madan Jampanif1b8e172015-03-23 11:42:02 -070028import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053029
Madan Jampani63c659f2015-06-11 00:52:58 -070030import org.onosproject.cluster.NodeId;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070031import org.onosproject.store.service.DatabaseUpdate;
32import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053033import org.onosproject.store.service.Versioned;
34
Madan Jampani94c23532015-02-05 17:40:01 -080035import com.google.common.collect.Lists;
36import com.google.common.collect.Maps;
37import com.google.common.collect.Sets;
38
Madan Jampanif1b8e172015-03-23 11:42:02 -070039import net.kuujo.copycat.Task;
40import net.kuujo.copycat.cluster.Cluster;
Madan Jampanied1b7fc2015-04-27 23:30:07 -070041import net.kuujo.copycat.resource.ResourceState;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080042import static com.google.common.base.Preconditions.checkState;
43
Madan Jampani94c23532015-02-05 17:40:01 -080044/**
45 * A database that partitions the keys across one or more database partitions.
46 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070047public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080048
Madan Jampanif1b8e172015-03-23 11:42:02 -070049 private final String name;
50 private final Partitioner<String> partitioner;
51 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080052 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070053 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani50589ac2015-06-08 11:38:46 -070054 private TransactionManager transactionManager;
Madan Jampani94c23532015-02-05 17:40:01 -080055
Madan Jampanif1b8e172015-03-23 11:42:02 -070056 public PartitionedDatabase(
57 String name,
58 Collection<Database> partitions) {
59 this.name = name;
60 this.partitions = partitions
61 .stream()
62 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
63 .collect(Collectors.toList());
64 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
65 }
66
67 /**
68 * Returns the databases for individual partitions.
69 * @return list of database partitions
70 */
71 public List<Database> getPartitions() {
72 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080073 }
74
Madan Jampani7f72c3f2015-03-01 17:34:59 -080075 /**
76 * Returns true if the database is open.
77 * @return true if open, false otherwise
78 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070079 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080080 public boolean isOpen() {
81 return isOpen.get();
82 }
83
Madan Jampani94c23532015-02-05 17:40:01 -080084 @Override
Madan Jampania89f8f92015-04-01 14:39:54 -070085 public CompletableFuture<Set<String>> tableNames() {
86 checkState(isOpen.get(), DB_NOT_OPEN);
87 Set<String> tableNames = Sets.newConcurrentHashSet();
88 return CompletableFuture.allOf(partitions
89 .stream()
90 .map(db -> db.tableNames().thenApply(tableNames::addAll))
91 .toArray(CompletableFuture[]::new))
92 .thenApply(v -> tableNames);
93 }
94
95 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070096 public CompletableFuture<Map<String, Long>> counters() {
97 checkState(isOpen.get(), DB_NOT_OPEN);
98 Map<String, Long> counters = Maps.newConcurrentMap();
99 return CompletableFuture.allOf(partitions
100 .stream()
101 .map(db -> db.counters()
102 .thenApply(m -> {
103 counters.putAll(m);
104 return null;
105 }))
106 .toArray(CompletableFuture[]::new))
107 .thenApply(v -> counters);
108 }
109
110 @Override
Madan Jampani94c23532015-02-05 17:40:01 -0800111 public CompletableFuture<Integer> size(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800112 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800113 AtomicInteger totalSize = new AtomicInteger(0);
114 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800115 .stream()
116 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
117 .toArray(CompletableFuture[]::new))
118 .thenApply(v -> totalSize.get());
119 }
120
121 @Override
122 public CompletableFuture<Boolean> isEmpty(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800123 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800124 return size(tableName).thenApply(size -> size == 0);
125 }
126
127 @Override
128 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800129 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800130 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
131 }
132
133 @Override
134 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800135 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800136 AtomicBoolean containsValue = new AtomicBoolean(false);
137 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800138 .stream()
139 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
140 .toArray(CompletableFuture[]::new))
141 .thenApply(v -> containsValue.get());
142 }
143
144 @Override
145 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800146 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800147 return partitioner.getPartition(tableName, key).get(tableName, key);
148 }
149
150 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700151 public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800152 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800153 return partitioner.getPartition(tableName, key).put(tableName, key, value);
154 }
155
156 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700157 public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
158 String key,
159 byte[] value) {
160 checkState(isOpen.get(), DB_NOT_OPEN);
161 return partitioner.getPartition(tableName, key).putAndGet(tableName, key, value);
162 }
163
164 @Override
165 public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
166 String key,
167 byte[] value) {
168 checkState(isOpen.get(), DB_NOT_OPEN);
169 return partitioner.getPartition(tableName, key).putIfAbsentAndGet(tableName, key, value);
170 }
171
172 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700173 public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800174 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800175 return partitioner.getPartition(tableName, key).remove(tableName, key);
176 }
177
178 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700179 public CompletableFuture<Result<Void>> clear(String tableName) {
180 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800181 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800182 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800183 .stream()
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700184 .map(p -> p.clear(tableName)
185 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
186 .toArray(CompletableFuture[]::new))
187 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800188 }
189
190 @Override
191 public CompletableFuture<Set<String>> keySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800192 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800193 Set<String> keySet = Sets.newConcurrentHashSet();
194 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800195 .stream()
196 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
197 .toArray(CompletableFuture[]::new))
198 .thenApply(v -> keySet);
199 }
200
201 @Override
202 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800203 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800204 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
205 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800206 .stream()
Madan Jampani393e0f02015-02-12 07:35:39 +0530207 .map(p -> p.values(tableName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800208 .toArray(CompletableFuture[]::new))
209 .thenApply(v -> values);
210 }
211
212 @Override
213 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800214 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800215 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
216 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800217 .stream()
218 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
219 .toArray(CompletableFuture[]::new))
220 .thenApply(v -> entrySet);
221 }
222
223 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700224 public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800225 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800226 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
227 }
228
229 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700230 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800231 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800232 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
233 }
234
235 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700236 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800237 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800238 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
239 }
240
241 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700242 public CompletableFuture<Result<Boolean>> replace(
243 String tableName, String key, byte[] oldValue, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800244 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800245 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
246 }
247
248 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700249 public CompletableFuture<Result<Boolean>> replace(
250 String tableName, String key, long oldVersion, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800251 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800252 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
253 }
254
255 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700256 public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(
257 String tableName, String key, long oldVersion, byte[] newValue) {
258 checkState(isOpen.get(), DB_NOT_OPEN);
259 return partitioner.getPartition(tableName, key).replaceAndGet(tableName, key, oldVersion, newValue);
260 }
261
262 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700263 public CompletableFuture<Long> counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700264 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700265 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700266 }
267
268 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700269 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700270 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700271 return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
272 }
273
274 @Override
275 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
276 checkState(isOpen.get(), DB_NOT_OPEN);
277 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700278 }
279
Madan Jampani63c659f2015-06-11 00:52:58 -0700280
281 @Override
282 public CompletableFuture<Long> queueSize(String queueName) {
283 checkState(isOpen.get(), DB_NOT_OPEN);
284 return partitioner.getPartition(queueName, queueName).queueSize(queueName);
285 }
286
287 @Override
288 public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
289 checkState(isOpen.get(), DB_NOT_OPEN);
290 return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
291 }
292
293 @Override
294 public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
295 checkState(isOpen.get(), DB_NOT_OPEN);
296 return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId);
297 }
298
299 @Override
300 public CompletableFuture<byte[]> queuePeek(String queueName) {
301 checkState(isOpen.get(), DB_NOT_OPEN);
302 return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
303 }
304
Madan Jampanib5d72d52015-04-03 16:53:50 -0700305 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700306 public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
307 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
308 if (subTransactions.isEmpty()) {
309 return CompletableFuture.completedFuture(true);
310 } else if (subTransactions.size() == 1) {
311 Entry<Database, Transaction> entry =
312 subTransactions.entrySet().iterator().next();
313 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800314 } else {
Madan Jampani98166f92015-06-26 15:12:33 -0700315 if (transactionManager == null) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700316 throw new IllegalStateException("TransactionManager is not initialized");
317 }
318 return transactionManager.execute(transaction);
Madan Jampani94c23532015-02-05 17:40:01 -0800319 }
320 }
321
322 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700323 public CompletableFuture<Boolean> prepare(Transaction transaction) {
324 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
325 AtomicBoolean status = new AtomicBoolean(true);
326 return CompletableFuture.allOf(subTransactions.entrySet()
327 .stream()
328 .map(entry -> entry
329 .getKey()
330 .prepare(entry.getValue())
331 .thenApply(v -> status.compareAndSet(true, v)))
332 .toArray(CompletableFuture[]::new))
333 .thenApply(v -> status.get());
334 }
335
336 @Override
337 public CompletableFuture<Boolean> commit(Transaction transaction) {
338 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
339 return CompletableFuture.allOf(subTransactions.entrySet()
340 .stream()
341 .map(entry -> entry.getKey().commit(entry.getValue()))
342 .toArray(CompletableFuture[]::new))
343 .thenApply(v -> true);
344 }
345
346 @Override
347 public CompletableFuture<Boolean> rollback(Transaction transaction) {
348 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
349 return CompletableFuture.allOf(subTransactions.entrySet()
350 .stream()
351 .map(entry -> entry.getKey().rollback(entry.getValue()))
352 .toArray(CompletableFuture[]::new))
353 .thenApply(v -> true);
354 }
355
356 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700357 public CompletableFuture<Database> open() {
358 return CompletableFuture.allOf(partitions
359 .stream()
360 .map(Database::open)
361 .toArray(CompletableFuture[]::new))
362 .thenApply(v -> {
363 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700364 return this;
365 });
Madan Jampani94c23532015-02-05 17:40:01 -0800366 }
367
368 @Override
369 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800370 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700371 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800372 .stream()
373 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800374 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800375 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700376
377 @Override
378 public boolean isClosed() {
379 return !isOpen.get();
380 }
381
382 @Override
383 public String name() {
384 return name;
385 }
386
387 @Override
388 public Cluster cluster() {
389 throw new UnsupportedOperationException();
390 }
391
392 @Override
393 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
394 throw new UnsupportedOperationException();
395 }
396
397 @Override
398 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
399 throw new UnsupportedOperationException();
400 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700401
Madan Jampanied1b7fc2015-04-27 23:30:07 -0700402 @Override
403 public ResourceState state() {
404 throw new UnsupportedOperationException();
405 }
406
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700407 private Map<Database, Transaction> createSubTransactions(
408 Transaction transaction) {
409 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
410 for (DatabaseUpdate update : transaction.updates()) {
411 Database partition = partitioner.getPartition(update.tableName(), update.key());
412 List<DatabaseUpdate> partitionUpdates =
413 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
414 partitionUpdates.add(update);
415 }
416 Map<Database, Transaction> subTransactions = Maps.newHashMap();
417 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700418 return subTransactions;
419 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700420
421 protected void setTransactionManager(TransactionManager tranasactionManager) {
422 this.transactionManager = transactionManager;
423 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700424}