blob: 2903e0aa8e3bf425e76e76de873b911004535518 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.CopyOnWriteArrayList;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicInteger;
Madan Jampanif1b8e172015-03-23 11:42:02 -070028import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053029
Madan Jampanibff6d8f2015-03-31 16:53:47 -070030import org.onosproject.store.service.DatabaseUpdate;
31import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053032import org.onosproject.store.service.Versioned;
33
Madan Jampani94c23532015-02-05 17:40:01 -080034import com.google.common.collect.Lists;
35import com.google.common.collect.Maps;
36import com.google.common.collect.Sets;
37
Madan Jampanif1b8e172015-03-23 11:42:02 -070038import net.kuujo.copycat.Task;
39import net.kuujo.copycat.cluster.Cluster;
Madan Jampanied1b7fc2015-04-27 23:30:07 -070040import net.kuujo.copycat.resource.ResourceState;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080041import static com.google.common.base.Preconditions.checkState;
42
Madan Jampani94c23532015-02-05 17:40:01 -080043/**
44 * A database that partitions the keys across one or more database partitions.
45 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070046public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080047
Madan Jampanif1b8e172015-03-23 11:42:02 -070048 private final String name;
49 private final Partitioner<String> partitioner;
50 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080051 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070052 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani94c23532015-02-05 17:40:01 -080053
Madan Jampanif1b8e172015-03-23 11:42:02 -070054 public PartitionedDatabase(
55 String name,
56 Collection<Database> partitions) {
57 this.name = name;
58 this.partitions = partitions
59 .stream()
60 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
61 .collect(Collectors.toList());
62 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
63 }
64
65 /**
66 * Returns the databases for individual partitions.
67 * @return list of database partitions
68 */
69 public List<Database> getPartitions() {
70 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080071 }
72
Madan Jampani7f72c3f2015-03-01 17:34:59 -080073 /**
74 * Returns true if the database is open.
75 * @return true if open, false otherwise
76 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070077 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080078 public boolean isOpen() {
79 return isOpen.get();
80 }
81
Madan Jampani94c23532015-02-05 17:40:01 -080082 @Override
Madan Jampania89f8f92015-04-01 14:39:54 -070083 public CompletableFuture<Set<String>> tableNames() {
84 checkState(isOpen.get(), DB_NOT_OPEN);
85 Set<String> tableNames = Sets.newConcurrentHashSet();
86 return CompletableFuture.allOf(partitions
87 .stream()
88 .map(db -> db.tableNames().thenApply(tableNames::addAll))
89 .toArray(CompletableFuture[]::new))
90 .thenApply(v -> tableNames);
91 }
92
93 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070094 public CompletableFuture<Map<String, Long>> counters() {
95 checkState(isOpen.get(), DB_NOT_OPEN);
96 Map<String, Long> counters = Maps.newConcurrentMap();
97 return CompletableFuture.allOf(partitions
98 .stream()
99 .map(db -> db.counters()
100 .thenApply(m -> {
101 counters.putAll(m);
102 return null;
103 }))
104 .toArray(CompletableFuture[]::new))
105 .thenApply(v -> counters);
106 }
107
108 @Override
Madan Jampani94c23532015-02-05 17:40:01 -0800109 public CompletableFuture<Integer> size(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800110 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800111 AtomicInteger totalSize = new AtomicInteger(0);
112 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800113 .stream()
114 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
115 .toArray(CompletableFuture[]::new))
116 .thenApply(v -> totalSize.get());
117 }
118
119 @Override
120 public CompletableFuture<Boolean> isEmpty(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800121 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800122 return size(tableName).thenApply(size -> size == 0);
123 }
124
125 @Override
126 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800127 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800128 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
129 }
130
131 @Override
132 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800133 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800134 AtomicBoolean containsValue = new AtomicBoolean(false);
135 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800136 .stream()
137 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
138 .toArray(CompletableFuture[]::new))
139 .thenApply(v -> containsValue.get());
140 }
141
142 @Override
143 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800144 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800145 return partitioner.getPartition(tableName, key).get(tableName, key);
146 }
147
148 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700149 public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800150 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800151 return partitioner.getPartition(tableName, key).put(tableName, key, value);
152 }
153
154 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700155 public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
156 String key,
157 byte[] value) {
158 checkState(isOpen.get(), DB_NOT_OPEN);
159 return partitioner.getPartition(tableName, key).putAndGet(tableName, key, value);
160 }
161
162 @Override
163 public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
164 String key,
165 byte[] value) {
166 checkState(isOpen.get(), DB_NOT_OPEN);
167 return partitioner.getPartition(tableName, key).putIfAbsentAndGet(tableName, key, value);
168 }
169
170 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700171 public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800172 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800173 return partitioner.getPartition(tableName, key).remove(tableName, key);
174 }
175
176 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700177 public CompletableFuture<Result<Void>> clear(String tableName) {
178 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800179 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800180 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800181 .stream()
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700182 .map(p -> p.clear(tableName)
183 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
184 .toArray(CompletableFuture[]::new))
185 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800186 }
187
188 @Override
189 public CompletableFuture<Set<String>> keySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800190 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800191 Set<String> keySet = Sets.newConcurrentHashSet();
192 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800193 .stream()
194 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
195 .toArray(CompletableFuture[]::new))
196 .thenApply(v -> keySet);
197 }
198
199 @Override
200 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800201 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800202 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
203 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800204 .stream()
Madan Jampani393e0f02015-02-12 07:35:39 +0530205 .map(p -> p.values(tableName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800206 .toArray(CompletableFuture[]::new))
207 .thenApply(v -> values);
208 }
209
210 @Override
211 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800212 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800213 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
214 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800215 .stream()
216 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
217 .toArray(CompletableFuture[]::new))
218 .thenApply(v -> entrySet);
219 }
220
221 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700222 public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800223 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800224 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
225 }
226
227 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700228 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800229 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800230 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
231 }
232
233 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700234 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800235 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800236 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
237 }
238
239 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700240 public CompletableFuture<Result<Boolean>> replace(
241 String tableName, String key, byte[] oldValue, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800242 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800243 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
244 }
245
246 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700247 public CompletableFuture<Result<Boolean>> replace(
248 String tableName, String key, long oldVersion, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800249 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800250 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
251 }
252
253 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700254 public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(
255 String tableName, String key, long oldVersion, byte[] newValue) {
256 checkState(isOpen.get(), DB_NOT_OPEN);
257 return partitioner.getPartition(tableName, key).replaceAndGet(tableName, key, oldVersion, newValue);
258 }
259
260 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700261 public CompletableFuture<Long> counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700262 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700263 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700264 }
265
266 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700267 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700268 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700269 return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
270 }
271
272 @Override
273 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
274 checkState(isOpen.get(), DB_NOT_OPEN);
275 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700276 }
277
278 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700279 public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
280 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
281 if (subTransactions.isEmpty()) {
282 return CompletableFuture.completedFuture(true);
283 } else if (subTransactions.size() == 1) {
284 Entry<Database, Transaction> entry =
285 subTransactions.entrySet().iterator().next();
286 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800287 } else {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700288 return new TransactionManager(this).execute(transaction);
Madan Jampani94c23532015-02-05 17:40:01 -0800289 }
290 }
291
292 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700293 public CompletableFuture<Boolean> prepare(Transaction transaction) {
294 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
295 AtomicBoolean status = new AtomicBoolean(true);
296 return CompletableFuture.allOf(subTransactions.entrySet()
297 .stream()
298 .map(entry -> entry
299 .getKey()
300 .prepare(entry.getValue())
301 .thenApply(v -> status.compareAndSet(true, v)))
302 .toArray(CompletableFuture[]::new))
303 .thenApply(v -> status.get());
304 }
305
306 @Override
307 public CompletableFuture<Boolean> commit(Transaction transaction) {
308 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
309 return CompletableFuture.allOf(subTransactions.entrySet()
310 .stream()
311 .map(entry -> entry.getKey().commit(entry.getValue()))
312 .toArray(CompletableFuture[]::new))
313 .thenApply(v -> true);
314 }
315
316 @Override
317 public CompletableFuture<Boolean> rollback(Transaction transaction) {
318 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
319 return CompletableFuture.allOf(subTransactions.entrySet()
320 .stream()
321 .map(entry -> entry.getKey().rollback(entry.getValue()))
322 .toArray(CompletableFuture[]::new))
323 .thenApply(v -> true);
324 }
325
326 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700327 public CompletableFuture<Database> open() {
328 return CompletableFuture.allOf(partitions
329 .stream()
330 .map(Database::open)
331 .toArray(CompletableFuture[]::new))
332 .thenApply(v -> {
333 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700334 return this;
335 });
Madan Jampani94c23532015-02-05 17:40:01 -0800336 }
337
338 @Override
339 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800340 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700341 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800342 .stream()
343 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800344 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800345 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700346
347 @Override
348 public boolean isClosed() {
349 return !isOpen.get();
350 }
351
352 @Override
353 public String name() {
354 return name;
355 }
356
357 @Override
358 public Cluster cluster() {
359 throw new UnsupportedOperationException();
360 }
361
362 @Override
363 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
364 throw new UnsupportedOperationException();
365 }
366
367 @Override
368 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
369 throw new UnsupportedOperationException();
370 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700371
Madan Jampanied1b7fc2015-04-27 23:30:07 -0700372 @Override
373 public ResourceState state() {
374 throw new UnsupportedOperationException();
375 }
376
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700377 private Map<Database, Transaction> createSubTransactions(
378 Transaction transaction) {
379 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
380 for (DatabaseUpdate update : transaction.updates()) {
381 Database partition = partitioner.getPartition(update.tableName(), update.key());
382 List<DatabaseUpdate> partitionUpdates =
383 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
384 partitionUpdates.add(update);
385 }
386 Map<Database, Transaction> subTransactions = Maps.newHashMap();
387 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700388 return subTransactions;
389 }
390}