blob: 09b3f597b1f6595e0414fe7d328ec11339ac78aa [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.CopyOnWriteArrayList;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani648451f2015-07-21 22:09:05 -070028import java.util.function.Consumer;
Madan Jampanif1b8e172015-03-23 11:42:02 -070029import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053030
Madan Jampani63c659f2015-06-11 00:52:58 -070031import org.onosproject.cluster.NodeId;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070032import org.onosproject.store.service.DatabaseUpdate;
33import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053034import org.onosproject.store.service.Versioned;
35
Madan Jampanibab51a42015-08-10 13:53:35 -070036import com.google.common.collect.ImmutableList;
Madan Jampani94c23532015-02-05 17:40:01 -080037import com.google.common.collect.Lists;
38import com.google.common.collect.Maps;
39import com.google.common.collect.Sets;
40
Madan Jampanif1b8e172015-03-23 11:42:02 -070041import net.kuujo.copycat.Task;
42import net.kuujo.copycat.cluster.Cluster;
Madan Jampanied1b7fc2015-04-27 23:30:07 -070043import net.kuujo.copycat.resource.ResourceState;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080044import static com.google.common.base.Preconditions.checkState;
45
Madan Jampani94c23532015-02-05 17:40:01 -080046/**
47 * A database that partitions the keys across one or more database partitions.
48 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070049public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080050
Madan Jampanif1b8e172015-03-23 11:42:02 -070051 private final String name;
52 private final Partitioner<String> partitioner;
53 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080054 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070055 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani50589ac2015-06-08 11:38:46 -070056 private TransactionManager transactionManager;
Madan Jampani94c23532015-02-05 17:40:01 -080057
Madan Jampanif1b8e172015-03-23 11:42:02 -070058 public PartitionedDatabase(
59 String name,
60 Collection<Database> partitions) {
61 this.name = name;
62 this.partitions = partitions
63 .stream()
64 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
65 .collect(Collectors.toList());
66 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
67 }
68
69 /**
70 * Returns the databases for individual partitions.
71 * @return list of database partitions
72 */
73 public List<Database> getPartitions() {
74 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080075 }
76
Madan Jampani7f72c3f2015-03-01 17:34:59 -080077 /**
78 * Returns true if the database is open.
79 * @return true if open, false otherwise
80 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070081 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080082 public boolean isOpen() {
83 return isOpen.get();
84 }
85
Madan Jampani94c23532015-02-05 17:40:01 -080086 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070087 public CompletableFuture<Set<String>> maps() {
Madan Jampania89f8f92015-04-01 14:39:54 -070088 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -070089 Set<String> mapNames = Sets.newConcurrentHashSet();
Madan Jampania89f8f92015-04-01 14:39:54 -070090 return CompletableFuture.allOf(partitions
91 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -070092 .map(db -> db.maps().thenApply(mapNames::addAll))
Madan Jampania89f8f92015-04-01 14:39:54 -070093 .toArray(CompletableFuture[]::new))
Madan Jampani7804c992015-07-20 13:20:19 -070094 .thenApply(v -> mapNames);
Madan Jampania89f8f92015-04-01 14:39:54 -070095 }
96
97 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070098 public CompletableFuture<Map<String, Long>> counters() {
99 checkState(isOpen.get(), DB_NOT_OPEN);
100 Map<String, Long> counters = Maps.newConcurrentMap();
101 return CompletableFuture.allOf(partitions
102 .stream()
103 .map(db -> db.counters()
104 .thenApply(m -> {
105 counters.putAll(m);
106 return null;
107 }))
108 .toArray(CompletableFuture[]::new))
109 .thenApply(v -> counters);
110 }
111
112 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700113 public CompletableFuture<Integer> mapSize(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800114 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800115 AtomicInteger totalSize = new AtomicInteger(0);
116 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800117 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700118 .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
Madan Jampani94c23532015-02-05 17:40:01 -0800119 .toArray(CompletableFuture[]::new))
120 .thenApply(v -> totalSize.get());
121 }
122
123 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700124 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800125 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700126 return mapSize(mapName).thenApply(size -> size == 0);
Madan Jampani94c23532015-02-05 17:40:01 -0800127 }
128
129 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700130 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800131 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700132 return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800133 }
134
135 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700136 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800137 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800138 AtomicBoolean containsValue = new AtomicBoolean(false);
139 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800140 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700141 .map(p -> p.mapContainsValue(mapName, value)
142 .thenApply(v -> containsValue.compareAndSet(false, v)))
Madan Jampani94c23532015-02-05 17:40:01 -0800143 .toArray(CompletableFuture[]::new))
144 .thenApply(v -> containsValue.get());
145 }
146
147 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700148 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800149 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700150 return partitioner.getPartition(mapName, key).mapGet(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800151 }
152
153 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700154 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
155 String mapName, String key, Match<byte[]> valueMatch,
156 Match<Long> versionMatch, byte[] value) {
157 return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value);
158
Madan Jampani94c23532015-02-05 17:40:01 -0800159 }
160
161 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700162 public CompletableFuture<Result<Void>> mapClear(String mapName) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700163 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800164 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800165 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800166 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700167 .map(p -> p.mapClear(mapName)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700168 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
169 .toArray(CompletableFuture[]::new))
170 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800171 }
172
173 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700174 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800175 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800176 Set<String> keySet = Sets.newConcurrentHashSet();
177 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800178 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700179 .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800180 .toArray(CompletableFuture[]::new))
181 .thenApply(v -> keySet);
182 }
183
184 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700185 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800186 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800187 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
188 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800189 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700190 .map(p -> p.mapValues(mapName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800191 .toArray(CompletableFuture[]::new))
192 .thenApply(v -> values);
193 }
194
195 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700196 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800197 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800198 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
199 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800200 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700201 .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800202 .toArray(CompletableFuture[]::new))
203 .thenApply(v -> entrySet);
204 }
205
206 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700207 public CompletableFuture<Long> counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700208 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700209 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700210 }
211
212 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700213 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700214 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700215 return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
216 }
217
218 @Override
219 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
220 checkState(isOpen.get(), DB_NOT_OPEN);
221 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700222 }
223
Madan Jampani63c659f2015-06-11 00:52:58 -0700224
225 @Override
226 public CompletableFuture<Long> queueSize(String queueName) {
227 checkState(isOpen.get(), DB_NOT_OPEN);
228 return partitioner.getPartition(queueName, queueName).queueSize(queueName);
229 }
230
231 @Override
232 public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
233 checkState(isOpen.get(), DB_NOT_OPEN);
234 return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
235 }
236
237 @Override
238 public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
239 checkState(isOpen.get(), DB_NOT_OPEN);
240 return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId);
241 }
242
243 @Override
244 public CompletableFuture<byte[]> queuePeek(String queueName) {
245 checkState(isOpen.get(), DB_NOT_OPEN);
246 return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
247 }
248
Madan Jampanib5d72d52015-04-03 16:53:50 -0700249 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700250 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700251 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
252 if (subTransactions.isEmpty()) {
Madan Jampanibab51a42015-08-10 13:53:35 -0700253 return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700254 } else if (subTransactions.size() == 1) {
255 Entry<Database, Transaction> entry =
256 subTransactions.entrySet().iterator().next();
257 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800258 } else {
Madan Jampani98166f92015-06-26 15:12:33 -0700259 if (transactionManager == null) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700260 throw new IllegalStateException("TransactionManager is not initialized");
261 }
262 return transactionManager.execute(transaction);
Madan Jampani94c23532015-02-05 17:40:01 -0800263 }
264 }
265
266 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700267 public CompletableFuture<Boolean> prepare(Transaction transaction) {
268 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
269 AtomicBoolean status = new AtomicBoolean(true);
270 return CompletableFuture.allOf(subTransactions.entrySet()
271 .stream()
272 .map(entry -> entry
273 .getKey()
274 .prepare(entry.getValue())
275 .thenApply(v -> status.compareAndSet(true, v)))
276 .toArray(CompletableFuture[]::new))
277 .thenApply(v -> status.get());
278 }
279
280 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700281 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700282 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
Madan Jampanibab51a42015-08-10 13:53:35 -0700283 AtomicBoolean success = new AtomicBoolean(true);
284 List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700285 return CompletableFuture.allOf(subTransactions.entrySet()
Madan Jampanibab51a42015-08-10 13:53:35 -0700286 .stream()
287 .map(entry -> entry.getKey().commit(entry.getValue())
288 .thenAccept(response -> {
289 success.set(success.get() && response.success());
290 if (success.get()) {
291 allUpdates.addAll(response.updates());
292 }
293 }))
294 .toArray(CompletableFuture[]::new))
295 .thenApply(v -> success.get() ?
296 CommitResponse.success(allUpdates) : CommitResponse.failure());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700297 }
298
299 @Override
300 public CompletableFuture<Boolean> rollback(Transaction transaction) {
301 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
302 return CompletableFuture.allOf(subTransactions.entrySet()
303 .stream()
304 .map(entry -> entry.getKey().rollback(entry.getValue()))
305 .toArray(CompletableFuture[]::new))
306 .thenApply(v -> true);
307 }
308
309 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700310 public CompletableFuture<Database> open() {
311 return CompletableFuture.allOf(partitions
312 .stream()
313 .map(Database::open)
314 .toArray(CompletableFuture[]::new))
315 .thenApply(v -> {
316 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700317 return this;
318 });
Madan Jampani94c23532015-02-05 17:40:01 -0800319 }
320
321 @Override
322 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800323 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700324 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800325 .stream()
326 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800327 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800328 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700329
330 @Override
331 public boolean isClosed() {
332 return !isOpen.get();
333 }
334
335 @Override
336 public String name() {
337 return name;
338 }
339
340 @Override
341 public Cluster cluster() {
342 throw new UnsupportedOperationException();
343 }
344
345 @Override
346 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
347 throw new UnsupportedOperationException();
348 }
349
350 @Override
351 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
352 throw new UnsupportedOperationException();
353 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700354
Madan Jampanied1b7fc2015-04-27 23:30:07 -0700355 @Override
356 public ResourceState state() {
357 throw new UnsupportedOperationException();
358 }
359
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700360 private Map<Database, Transaction> createSubTransactions(
361 Transaction transaction) {
362 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
363 for (DatabaseUpdate update : transaction.updates()) {
Madan Jampani7804c992015-07-20 13:20:19 -0700364 Database partition = partitioner.getPartition(update.mapName(), update.key());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700365 List<DatabaseUpdate> partitionUpdates =
366 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
367 partitionUpdates.add(update);
368 }
369 Map<Database, Transaction> subTransactions = Maps.newHashMap();
370 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700371 return subTransactions;
372 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700373
Ayaka Koshibe94cc01b2015-06-26 15:39:11 -0700374 protected void setTransactionManager(TransactionManager transactionManager) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700375 this.transactionManager = transactionManager;
376 }
Madan Jampani648451f2015-07-21 22:09:05 -0700377
378 @Override
Madan Jampani648451f2015-07-21 22:09:05 -0700379 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700380 partitions.forEach(p -> p.registerConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700381 }
382
383 @Override
384 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700385 partitions.forEach(p -> p.unregisterConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700386 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700387}