blob: 6c537c0f557718d87148949d8471a80061720c6c [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.CopyOnWriteArrayList;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani648451f2015-07-21 22:09:05 -070028import java.util.function.Consumer;
Madan Jampanif1b8e172015-03-23 11:42:02 -070029import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053030
Madan Jampani63c659f2015-06-11 00:52:58 -070031import org.onosproject.cluster.NodeId;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070032import org.onosproject.store.service.DatabaseUpdate;
33import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053034import org.onosproject.store.service.Versioned;
35
Madan Jampani94c23532015-02-05 17:40:01 -080036import com.google.common.collect.Lists;
37import com.google.common.collect.Maps;
38import com.google.common.collect.Sets;
39
Madan Jampanif1b8e172015-03-23 11:42:02 -070040import net.kuujo.copycat.Task;
41import net.kuujo.copycat.cluster.Cluster;
Madan Jampanied1b7fc2015-04-27 23:30:07 -070042import net.kuujo.copycat.resource.ResourceState;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080043import static com.google.common.base.Preconditions.checkState;
44
Madan Jampani94c23532015-02-05 17:40:01 -080045/**
46 * A database that partitions the keys across one or more database partitions.
47 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070048public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080049
Madan Jampanif1b8e172015-03-23 11:42:02 -070050 private final String name;
51 private final Partitioner<String> partitioner;
52 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080053 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070054 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani50589ac2015-06-08 11:38:46 -070055 private TransactionManager transactionManager;
Madan Jampani94c23532015-02-05 17:40:01 -080056
Madan Jampanif1b8e172015-03-23 11:42:02 -070057 public PartitionedDatabase(
58 String name,
59 Collection<Database> partitions) {
60 this.name = name;
61 this.partitions = partitions
62 .stream()
63 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
64 .collect(Collectors.toList());
65 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
66 }
67
68 /**
69 * Returns the databases for individual partitions.
70 * @return list of database partitions
71 */
72 public List<Database> getPartitions() {
73 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080074 }
75
Madan Jampani7f72c3f2015-03-01 17:34:59 -080076 /**
77 * Returns true if the database is open.
78 * @return true if open, false otherwise
79 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070080 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080081 public boolean isOpen() {
82 return isOpen.get();
83 }
84
Madan Jampani94c23532015-02-05 17:40:01 -080085 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070086 public CompletableFuture<Set<String>> maps() {
Madan Jampania89f8f92015-04-01 14:39:54 -070087 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -070088 Set<String> mapNames = Sets.newConcurrentHashSet();
Madan Jampania89f8f92015-04-01 14:39:54 -070089 return CompletableFuture.allOf(partitions
90 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -070091 .map(db -> db.maps().thenApply(mapNames::addAll))
Madan Jampania89f8f92015-04-01 14:39:54 -070092 .toArray(CompletableFuture[]::new))
Madan Jampani7804c992015-07-20 13:20:19 -070093 .thenApply(v -> mapNames);
Madan Jampania89f8f92015-04-01 14:39:54 -070094 }
95
96 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070097 public CompletableFuture<Map<String, Long>> counters() {
98 checkState(isOpen.get(), DB_NOT_OPEN);
99 Map<String, Long> counters = Maps.newConcurrentMap();
100 return CompletableFuture.allOf(partitions
101 .stream()
102 .map(db -> db.counters()
103 .thenApply(m -> {
104 counters.putAll(m);
105 return null;
106 }))
107 .toArray(CompletableFuture[]::new))
108 .thenApply(v -> counters);
109 }
110
111 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700112 public CompletableFuture<Integer> mapSize(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800113 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800114 AtomicInteger totalSize = new AtomicInteger(0);
115 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800116 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700117 .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
Madan Jampani94c23532015-02-05 17:40:01 -0800118 .toArray(CompletableFuture[]::new))
119 .thenApply(v -> totalSize.get());
120 }
121
122 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700123 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800124 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700125 return mapSize(mapName).thenApply(size -> size == 0);
Madan Jampani94c23532015-02-05 17:40:01 -0800126 }
127
128 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700129 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800130 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700131 return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800132 }
133
134 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700135 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800136 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800137 AtomicBoolean containsValue = new AtomicBoolean(false);
138 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800139 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700140 .map(p -> p.mapContainsValue(mapName, value)
141 .thenApply(v -> containsValue.compareAndSet(false, v)))
Madan Jampani94c23532015-02-05 17:40:01 -0800142 .toArray(CompletableFuture[]::new))
143 .thenApply(v -> containsValue.get());
144 }
145
146 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700147 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800148 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700149 return partitioner.getPartition(mapName, key).mapGet(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800150 }
151
152 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700153 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
154 String mapName, String key, Match<byte[]> valueMatch,
155 Match<Long> versionMatch, byte[] value) {
156 return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value);
157
Madan Jampani94c23532015-02-05 17:40:01 -0800158 }
159
160 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700161 public CompletableFuture<Result<Void>> mapClear(String mapName) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700162 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800163 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800164 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800165 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700166 .map(p -> p.mapClear(mapName)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700167 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
168 .toArray(CompletableFuture[]::new))
169 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800170 }
171
172 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700173 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800174 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800175 Set<String> keySet = Sets.newConcurrentHashSet();
176 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800177 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700178 .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800179 .toArray(CompletableFuture[]::new))
180 .thenApply(v -> keySet);
181 }
182
183 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700184 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800185 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800186 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
187 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800188 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700189 .map(p -> p.mapValues(mapName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800190 .toArray(CompletableFuture[]::new))
191 .thenApply(v -> values);
192 }
193
194 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700195 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800196 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800197 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
198 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800199 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700200 .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800201 .toArray(CompletableFuture[]::new))
202 .thenApply(v -> entrySet);
203 }
204
205 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700206 public CompletableFuture<Long> counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700207 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700208 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700209 }
210
211 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700212 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700213 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700214 return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
215 }
216
217 @Override
218 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
219 checkState(isOpen.get(), DB_NOT_OPEN);
220 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700221 }
222
Madan Jampani63c659f2015-06-11 00:52:58 -0700223
224 @Override
225 public CompletableFuture<Long> queueSize(String queueName) {
226 checkState(isOpen.get(), DB_NOT_OPEN);
227 return partitioner.getPartition(queueName, queueName).queueSize(queueName);
228 }
229
230 @Override
231 public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
232 checkState(isOpen.get(), DB_NOT_OPEN);
233 return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
234 }
235
236 @Override
237 public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
238 checkState(isOpen.get(), DB_NOT_OPEN);
239 return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId);
240 }
241
242 @Override
243 public CompletableFuture<byte[]> queuePeek(String queueName) {
244 checkState(isOpen.get(), DB_NOT_OPEN);
245 return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
246 }
247
Madan Jampanib5d72d52015-04-03 16:53:50 -0700248 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700249 public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
250 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
251 if (subTransactions.isEmpty()) {
252 return CompletableFuture.completedFuture(true);
253 } else if (subTransactions.size() == 1) {
254 Entry<Database, Transaction> entry =
255 subTransactions.entrySet().iterator().next();
256 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800257 } else {
Madan Jampani98166f92015-06-26 15:12:33 -0700258 if (transactionManager == null) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700259 throw new IllegalStateException("TransactionManager is not initialized");
260 }
261 return transactionManager.execute(transaction);
Madan Jampani94c23532015-02-05 17:40:01 -0800262 }
263 }
264
265 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700266 public CompletableFuture<Boolean> prepare(Transaction transaction) {
267 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
268 AtomicBoolean status = new AtomicBoolean(true);
269 return CompletableFuture.allOf(subTransactions.entrySet()
270 .stream()
271 .map(entry -> entry
272 .getKey()
273 .prepare(entry.getValue())
274 .thenApply(v -> status.compareAndSet(true, v)))
275 .toArray(CompletableFuture[]::new))
276 .thenApply(v -> status.get());
277 }
278
279 @Override
280 public CompletableFuture<Boolean> commit(Transaction transaction) {
281 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
282 return CompletableFuture.allOf(subTransactions.entrySet()
283 .stream()
284 .map(entry -> entry.getKey().commit(entry.getValue()))
285 .toArray(CompletableFuture[]::new))
286 .thenApply(v -> true);
287 }
288
289 @Override
290 public CompletableFuture<Boolean> rollback(Transaction transaction) {
291 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
292 return CompletableFuture.allOf(subTransactions.entrySet()
293 .stream()
294 .map(entry -> entry.getKey().rollback(entry.getValue()))
295 .toArray(CompletableFuture[]::new))
296 .thenApply(v -> true);
297 }
298
299 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700300 public CompletableFuture<Database> open() {
301 return CompletableFuture.allOf(partitions
302 .stream()
303 .map(Database::open)
304 .toArray(CompletableFuture[]::new))
305 .thenApply(v -> {
306 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700307 return this;
308 });
Madan Jampani94c23532015-02-05 17:40:01 -0800309 }
310
311 @Override
312 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800313 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700314 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800315 .stream()
316 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800317 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800318 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700319
320 @Override
321 public boolean isClosed() {
322 return !isOpen.get();
323 }
324
325 @Override
326 public String name() {
327 return name;
328 }
329
330 @Override
331 public Cluster cluster() {
332 throw new UnsupportedOperationException();
333 }
334
335 @Override
336 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
337 throw new UnsupportedOperationException();
338 }
339
340 @Override
341 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
342 throw new UnsupportedOperationException();
343 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700344
Madan Jampanied1b7fc2015-04-27 23:30:07 -0700345 @Override
346 public ResourceState state() {
347 throw new UnsupportedOperationException();
348 }
349
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700350 private Map<Database, Transaction> createSubTransactions(
351 Transaction transaction) {
352 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
353 for (DatabaseUpdate update : transaction.updates()) {
Madan Jampani7804c992015-07-20 13:20:19 -0700354 Database partition = partitioner.getPartition(update.mapName(), update.key());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700355 List<DatabaseUpdate> partitionUpdates =
356 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
357 partitionUpdates.add(update);
358 }
359 Map<Database, Transaction> subTransactions = Maps.newHashMap();
360 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700361 return subTransactions;
362 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700363
Ayaka Koshibe94cc01b2015-06-26 15:39:11 -0700364 protected void setTransactionManager(TransactionManager transactionManager) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700365 this.transactionManager = transactionManager;
366 }
Madan Jampani648451f2015-07-21 22:09:05 -0700367
368 @Override
Madan Jampani648451f2015-07-21 22:09:05 -0700369 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700370 partitions.forEach(p -> p.registerConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700371 }
372
373 @Override
374 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700375 partitions.forEach(p -> p.unregisterConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700376 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700377}