blob: 178f49f8cc6b515879ef563d923ce37da18adce0 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.CopyOnWriteArrayList;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicInteger;
Madan Jampanif1b8e172015-03-23 11:42:02 -070028import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053029
Madan Jampanibff6d8f2015-03-31 16:53:47 -070030import org.onosproject.store.service.DatabaseUpdate;
31import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053032import org.onosproject.store.service.Versioned;
33
Madan Jampani94c23532015-02-05 17:40:01 -080034import com.google.common.collect.Lists;
35import com.google.common.collect.Maps;
36import com.google.common.collect.Sets;
37
Madan Jampanif1b8e172015-03-23 11:42:02 -070038import net.kuujo.copycat.Task;
39import net.kuujo.copycat.cluster.Cluster;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080040import static com.google.common.base.Preconditions.checkState;
41
Madan Jampani94c23532015-02-05 17:40:01 -080042/**
43 * A database that partitions the keys across one or more database partitions.
44 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070045public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080046
Madan Jampanif1b8e172015-03-23 11:42:02 -070047 private final String name;
48 private final Partitioner<String> partitioner;
49 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080050 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070051 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani94c23532015-02-05 17:40:01 -080052
Madan Jampanif1b8e172015-03-23 11:42:02 -070053 public PartitionedDatabase(
54 String name,
55 Collection<Database> partitions) {
56 this.name = name;
57 this.partitions = partitions
58 .stream()
59 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
60 .collect(Collectors.toList());
61 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
62 }
63
64 /**
65 * Returns the databases for individual partitions.
66 * @return list of database partitions
67 */
68 public List<Database> getPartitions() {
69 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080070 }
71
Madan Jampani7f72c3f2015-03-01 17:34:59 -080072 /**
73 * Returns true if the database is open.
74 * @return true if open, false otherwise
75 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070076 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080077 public boolean isOpen() {
78 return isOpen.get();
79 }
80
Madan Jampani94c23532015-02-05 17:40:01 -080081 @Override
Madan Jampania89f8f92015-04-01 14:39:54 -070082 public CompletableFuture<Set<String>> tableNames() {
83 checkState(isOpen.get(), DB_NOT_OPEN);
84 Set<String> tableNames = Sets.newConcurrentHashSet();
85 return CompletableFuture.allOf(partitions
86 .stream()
87 .map(db -> db.tableNames().thenApply(tableNames::addAll))
88 .toArray(CompletableFuture[]::new))
89 .thenApply(v -> tableNames);
90 }
91
92 @Override
Madan Jampani94c23532015-02-05 17:40:01 -080093 public CompletableFuture<Integer> size(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -080094 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -080095 AtomicInteger totalSize = new AtomicInteger(0);
96 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -080097 .stream()
98 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
99 .toArray(CompletableFuture[]::new))
100 .thenApply(v -> totalSize.get());
101 }
102
103 @Override
104 public CompletableFuture<Boolean> isEmpty(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800105 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800106 return size(tableName).thenApply(size -> size == 0);
107 }
108
109 @Override
110 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800111 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800112 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
113 }
114
115 @Override
116 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800117 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800118 AtomicBoolean containsValue = new AtomicBoolean(false);
119 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800120 .stream()
121 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
122 .toArray(CompletableFuture[]::new))
123 .thenApply(v -> containsValue.get());
124 }
125
126 @Override
127 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800128 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800129 return partitioner.getPartition(tableName, key).get(tableName, key);
130 }
131
132 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700133 public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800134 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800135 return partitioner.getPartition(tableName, key).put(tableName, key, value);
136 }
137
138 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700139 public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800140 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800141 return partitioner.getPartition(tableName, key).remove(tableName, key);
142 }
143
144 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700145 public CompletableFuture<Result<Void>> clear(String tableName) {
146 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800147 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800148 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800149 .stream()
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700150 .map(p -> p.clear(tableName)
151 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
152 .toArray(CompletableFuture[]::new))
153 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800154 }
155
156 @Override
157 public CompletableFuture<Set<String>> keySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800158 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800159 Set<String> keySet = Sets.newConcurrentHashSet();
160 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800161 .stream()
162 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
163 .toArray(CompletableFuture[]::new))
164 .thenApply(v -> keySet);
165 }
166
167 @Override
168 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800169 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800170 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
171 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800172 .stream()
Madan Jampani393e0f02015-02-12 07:35:39 +0530173 .map(p -> p.values(tableName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800174 .toArray(CompletableFuture[]::new))
175 .thenApply(v -> values);
176 }
177
178 @Override
179 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800180 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800181 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
182 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800183 .stream()
184 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
185 .toArray(CompletableFuture[]::new))
186 .thenApply(v -> entrySet);
187 }
188
189 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700190 public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800191 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800192 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
193 }
194
195 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700196 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800197 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800198 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
199 }
200
201 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700202 public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800203 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800204 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
205 }
206
207 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700208 public CompletableFuture<Result<Boolean>> replace(
209 String tableName, String key, byte[] oldValue, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800210 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800211 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
212 }
213
214 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700215 public CompletableFuture<Result<Boolean>> replace(
216 String tableName, String key, long oldVersion, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800217 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800218 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
219 }
220
221 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700222 public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
223 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
224 if (subTransactions.isEmpty()) {
225 return CompletableFuture.completedFuture(true);
226 } else if (subTransactions.size() == 1) {
227 Entry<Database, Transaction> entry =
228 subTransactions.entrySet().iterator().next();
229 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800230 } else {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700231 return new TransactionManager(this).execute(transaction);
Madan Jampani94c23532015-02-05 17:40:01 -0800232 }
233 }
234
235 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700236 public CompletableFuture<Boolean> prepare(Transaction transaction) {
237 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
238 AtomicBoolean status = new AtomicBoolean(true);
239 return CompletableFuture.allOf(subTransactions.entrySet()
240 .stream()
241 .map(entry -> entry
242 .getKey()
243 .prepare(entry.getValue())
244 .thenApply(v -> status.compareAndSet(true, v)))
245 .toArray(CompletableFuture[]::new))
246 .thenApply(v -> status.get());
247 }
248
249 @Override
250 public CompletableFuture<Boolean> commit(Transaction transaction) {
251 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
252 return CompletableFuture.allOf(subTransactions.entrySet()
253 .stream()
254 .map(entry -> entry.getKey().commit(entry.getValue()))
255 .toArray(CompletableFuture[]::new))
256 .thenApply(v -> true);
257 }
258
259 @Override
260 public CompletableFuture<Boolean> rollback(Transaction transaction) {
261 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
262 return CompletableFuture.allOf(subTransactions.entrySet()
263 .stream()
264 .map(entry -> entry.getKey().rollback(entry.getValue()))
265 .toArray(CompletableFuture[]::new))
266 .thenApply(v -> true);
267 }
268
269 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700270 public CompletableFuture<Database> open() {
271 return CompletableFuture.allOf(partitions
272 .stream()
273 .map(Database::open)
274 .toArray(CompletableFuture[]::new))
275 .thenApply(v -> {
276 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700277 return this;
278 });
Madan Jampani94c23532015-02-05 17:40:01 -0800279 }
280
281 @Override
282 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800283 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700284 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800285 .stream()
286 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800287 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800288 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700289
290 @Override
291 public boolean isClosed() {
292 return !isOpen.get();
293 }
294
295 @Override
296 public String name() {
297 return name;
298 }
299
300 @Override
301 public Cluster cluster() {
302 throw new UnsupportedOperationException();
303 }
304
305 @Override
306 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
307 throw new UnsupportedOperationException();
308 }
309
310 @Override
311 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
312 throw new UnsupportedOperationException();
313 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700314
315 private Map<Database, Transaction> createSubTransactions(
316 Transaction transaction) {
317 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
318 for (DatabaseUpdate update : transaction.updates()) {
319 Database partition = partitioner.getPartition(update.tableName(), update.key());
320 List<DatabaseUpdate> partitionUpdates =
321 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
322 partitionUpdates.add(update);
323 }
324 Map<Database, Transaction> subTransactions = Maps.newHashMap();
325 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
326
327 return subTransactions;
328 }
329}