blob: f741b3679c89da84245ecb83e6170dd045078916 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070019import com.google.common.collect.ImmutableList;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import net.kuujo.copycat.Task;
24import net.kuujo.copycat.cluster.Cluster;
25import net.kuujo.copycat.resource.ResourceState;
26import org.onosproject.store.service.DatabaseUpdate;
27import org.onosproject.store.service.Transaction;
28import org.onosproject.store.service.Versioned;
29
Madan Jampani94c23532015-02-05 17:40:01 -080030import java.util.Collection;
31import java.util.List;
32import java.util.Map;
33import java.util.Map.Entry;
34import java.util.Set;
35import java.util.concurrent.CompletableFuture;
36import java.util.concurrent.CopyOnWriteArrayList;
37import java.util.concurrent.atomic.AtomicBoolean;
38import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani648451f2015-07-21 22:09:05 -070039import java.util.function.Consumer;
Madan Jampanif1b8e172015-03-23 11:42:02 -070040import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053041
Madan Jampani7f72c3f2015-03-01 17:34:59 -080042import static com.google.common.base.Preconditions.checkState;
43
Madan Jampani94c23532015-02-05 17:40:01 -080044/**
45 * A database that partitions the keys across one or more database partitions.
46 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070047public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080048
Madan Jampanif1b8e172015-03-23 11:42:02 -070049 private final String name;
50 private final Partitioner<String> partitioner;
51 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080052 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070053 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani50589ac2015-06-08 11:38:46 -070054 private TransactionManager transactionManager;
Madan Jampani94c23532015-02-05 17:40:01 -080055
Madan Jampanif1b8e172015-03-23 11:42:02 -070056 public PartitionedDatabase(
57 String name,
58 Collection<Database> partitions) {
59 this.name = name;
60 this.partitions = partitions
61 .stream()
62 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
63 .collect(Collectors.toList());
64 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
65 }
66
67 /**
68 * Returns the databases for individual partitions.
69 * @return list of database partitions
70 */
71 public List<Database> getPartitions() {
72 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080073 }
74
Madan Jampani7f72c3f2015-03-01 17:34:59 -080075 /**
76 * Returns true if the database is open.
77 * @return true if open, false otherwise
78 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070079 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080080 public boolean isOpen() {
81 return isOpen.get();
82 }
83
Madan Jampani94c23532015-02-05 17:40:01 -080084 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070085 public CompletableFuture<Set<String>> maps() {
Madan Jampania89f8f92015-04-01 14:39:54 -070086 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -070087 Set<String> mapNames = Sets.newConcurrentHashSet();
Madan Jampania89f8f92015-04-01 14:39:54 -070088 return CompletableFuture.allOf(partitions
89 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -070090 .map(db -> db.maps().thenApply(mapNames::addAll))
Madan Jampania89f8f92015-04-01 14:39:54 -070091 .toArray(CompletableFuture[]::new))
Madan Jampani7804c992015-07-20 13:20:19 -070092 .thenApply(v -> mapNames);
Madan Jampania89f8f92015-04-01 14:39:54 -070093 }
94
95 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070096 public CompletableFuture<Map<String, Long>> counters() {
97 checkState(isOpen.get(), DB_NOT_OPEN);
98 Map<String, Long> counters = Maps.newConcurrentMap();
99 return CompletableFuture.allOf(partitions
100 .stream()
101 .map(db -> db.counters()
andreafd912ac2015-10-02 14:58:35 -0700102 .thenApply(m -> {
103 counters.putAll(m);
104 return null;
105 }))
Madan Jampanib5d72d52015-04-03 16:53:50 -0700106 .toArray(CompletableFuture[]::new))
107 .thenApply(v -> counters);
108 }
109
110 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700111 public CompletableFuture<Integer> mapSize(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800112 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800113 AtomicInteger totalSize = new AtomicInteger(0);
114 return CompletableFuture.allOf(partitions
andreafd912ac2015-10-02 14:58:35 -0700115 .stream()
116 .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
117 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800118 .thenApply(v -> totalSize.get());
119 }
120
121 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700122 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800123 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700124 return mapSize(mapName).thenApply(size -> size == 0);
Madan Jampani94c23532015-02-05 17:40:01 -0800125 }
126
127 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700128 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800129 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700130 return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800131 }
132
133 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700134 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800135 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800136 AtomicBoolean containsValue = new AtomicBoolean(false);
137 return CompletableFuture.allOf(partitions
andreafd912ac2015-10-02 14:58:35 -0700138 .stream()
139 .map(p -> p.mapContainsValue(mapName, value)
140 .thenApply(v -> containsValue.compareAndSet(false, v)))
141 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800142 .thenApply(v -> containsValue.get());
143 }
144
145 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700146 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800147 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700148 return partitioner.getPartition(mapName, key).mapGet(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800149 }
150
151 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700152 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
153 String mapName, String key, Match<byte[]> valueMatch,
154 Match<Long> versionMatch, byte[] value) {
155 return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value);
156
Madan Jampani94c23532015-02-05 17:40:01 -0800157 }
158
159 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700160 public CompletableFuture<Result<Void>> mapClear(String mapName) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700161 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800162 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800163 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800164 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700165 .map(p -> p.mapClear(mapName)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700166 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
167 .toArray(CompletableFuture[]::new))
168 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800169 }
170
171 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700172 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800173 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800174 Set<String> keySet = Sets.newConcurrentHashSet();
175 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800176 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700177 .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800178 .toArray(CompletableFuture[]::new))
179 .thenApply(v -> keySet);
180 }
181
182 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700183 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800184 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800185 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
186 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800187 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700188 .map(p -> p.mapValues(mapName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800189 .toArray(CompletableFuture[]::new))
190 .thenApply(v -> values);
191 }
192
193 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700194 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800195 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800196 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
197 return CompletableFuture.allOf(partitions
andreafd912ac2015-10-02 14:58:35 -0700198 .stream()
199 .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
200 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800201 .thenApply(v -> entrySet);
202 }
203
204 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700205 public CompletableFuture<Long> counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700206 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700207 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700208 }
209
210 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700211 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700212 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700213 return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
214 }
215
216 @Override
217 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
218 checkState(isOpen.get(), DB_NOT_OPEN);
219 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700220 }
221
andreafd912ac2015-10-02 14:58:35 -0700222 @Override
223 public CompletableFuture<Void> counterSet(String counterName, long value) {
224 checkState(isOpen.get(), DB_NOT_OPEN);
225 return partitioner.getPartition(counterName, counterName).counterSet(counterName, value);
226 }
Madan Jampani63c659f2015-06-11 00:52:58 -0700227
228 @Override
Aaron Kruglikov82fd6322015-10-06 12:02:46 -0700229 public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
230 checkState(isOpen.get(), DB_NOT_OPEN);
231 return partitioner.getPartition(counterName, counterName).
232 counterCompareAndSet(counterName, expectedValue, updateValue);
233
234 }
235
236 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700237 public CompletableFuture<Long> queueSize(String queueName) {
238 checkState(isOpen.get(), DB_NOT_OPEN);
239 return partitioner.getPartition(queueName, queueName).queueSize(queueName);
240 }
241
242 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700243 public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700244 checkState(isOpen.get(), DB_NOT_OPEN);
245 return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
246 }
247
248 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700249 public CompletableFuture<byte[]> queuePop(String queueName) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700250 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampania6d787b2015-08-11 11:02:02 -0700251 return partitioner.getPartition(queueName, queueName).queuePop(queueName);
Madan Jampani63c659f2015-06-11 00:52:58 -0700252 }
253
254 @Override
255 public CompletableFuture<byte[]> queuePeek(String queueName) {
256 checkState(isOpen.get(), DB_NOT_OPEN);
257 return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
258 }
259
Madan Jampanib5d72d52015-04-03 16:53:50 -0700260 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700261 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700262 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
263 if (subTransactions.isEmpty()) {
Madan Jampanibab51a42015-08-10 13:53:35 -0700264 return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700265 } else if (subTransactions.size() == 1) {
266 Entry<Database, Transaction> entry =
267 subTransactions.entrySet().iterator().next();
268 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800269 } else {
Madan Jampani98166f92015-06-26 15:12:33 -0700270 if (transactionManager == null) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700271 throw new IllegalStateException("TransactionManager is not initialized");
272 }
273 return transactionManager.execute(transaction);
Madan Jampani94c23532015-02-05 17:40:01 -0800274 }
275 }
276
277 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700278 public CompletableFuture<Boolean> prepare(Transaction transaction) {
279 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
280 AtomicBoolean status = new AtomicBoolean(true);
281 return CompletableFuture.allOf(subTransactions.entrySet()
282 .stream()
andreafd912ac2015-10-02 14:58:35 -0700283 .map(entry -> entry
284 .getKey()
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700285 .prepare(entry.getValue())
286 .thenApply(v -> status.compareAndSet(true, v)))
287 .toArray(CompletableFuture[]::new))
288 .thenApply(v -> status.get());
289 }
290
291 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700292 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700293 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
Madan Jampanibab51a42015-08-10 13:53:35 -0700294 AtomicBoolean success = new AtomicBoolean(true);
295 List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700296 return CompletableFuture.allOf(subTransactions.entrySet()
andreafd912ac2015-10-02 14:58:35 -0700297 .stream()
298 .map(entry -> entry.getKey().commit(entry.getValue())
299 .thenAccept(response -> {
300 success.set(success.get() && response.success());
301 if (success.get()) {
302 allUpdates.addAll(response.updates());
303 }
304 }))
305 .toArray(CompletableFuture[]::new))
Madan Jampanibab51a42015-08-10 13:53:35 -0700306 .thenApply(v -> success.get() ?
307 CommitResponse.success(allUpdates) : CommitResponse.failure());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700308 }
309
310 @Override
311 public CompletableFuture<Boolean> rollback(Transaction transaction) {
312 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
313 return CompletableFuture.allOf(subTransactions.entrySet()
314 .stream()
315 .map(entry -> entry.getKey().rollback(entry.getValue()))
andreafd912ac2015-10-02 14:58:35 -0700316 .toArray(CompletableFuture[]::new))
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700317 .thenApply(v -> true);
318 }
319
320 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700321 public CompletableFuture<Database> open() {
322 return CompletableFuture.allOf(partitions
323 .stream()
324 .map(Database::open)
325 .toArray(CompletableFuture[]::new))
326 .thenApply(v -> {
327 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700328 return this;
329 });
Madan Jampani94c23532015-02-05 17:40:01 -0800330 }
331
332 @Override
333 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800334 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700335 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800336 .stream()
337 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800338 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800339 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700340
341 @Override
342 public boolean isClosed() {
343 return !isOpen.get();
344 }
345
346 @Override
347 public String name() {
348 return name;
349 }
350
351 @Override
352 public Cluster cluster() {
353 throw new UnsupportedOperationException();
354 }
355
356 @Override
357 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
358 throw new UnsupportedOperationException();
359 }
360
361 @Override
362 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
363 throw new UnsupportedOperationException();
364 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700365
Madan Jampanied1b7fc2015-04-27 23:30:07 -0700366 @Override
367 public ResourceState state() {
368 throw new UnsupportedOperationException();
369 }
370
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700371 private Map<Database, Transaction> createSubTransactions(
372 Transaction transaction) {
373 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
374 for (DatabaseUpdate update : transaction.updates()) {
Madan Jampani7804c992015-07-20 13:20:19 -0700375 Database partition = partitioner.getPartition(update.mapName(), update.key());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700376 List<DatabaseUpdate> partitionUpdates =
377 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
378 partitionUpdates.add(update);
379 }
380 Map<Database, Transaction> subTransactions = Maps.newHashMap();
381 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700382 return subTransactions;
383 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700384
Ayaka Koshibe94cc01b2015-06-26 15:39:11 -0700385 protected void setTransactionManager(TransactionManager transactionManager) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700386 this.transactionManager = transactionManager;
387 }
Madan Jampani648451f2015-07-21 22:09:05 -0700388
389 @Override
Madan Jampani648451f2015-07-21 22:09:05 -0700390 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700391 partitions.forEach(p -> p.registerConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700392 }
393
394 @Override
395 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700396 partitions.forEach(p -> p.unregisterConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700397 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700398}
andreafd912ac2015-10-02 14:58:35 -0700399