blob: 24ec92b13237c327af35bf41bad4f81074c45498 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
19import java.util.Collection;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.CopyOnWriteArrayList;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani393e0f02015-02-12 07:35:39 +053028
29import org.onosproject.store.service.UpdateOperation;
30import org.onosproject.store.service.Versioned;
31
Madan Jampani94c23532015-02-05 17:40:01 -080032import com.google.common.collect.ImmutableMap;
33import com.google.common.collect.Lists;
34import com.google.common.collect.Maps;
35import com.google.common.collect.Sets;
36
37import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
38
Madan Jampani7f72c3f2015-03-01 17:34:59 -080039import static com.google.common.base.Preconditions.checkState;
40
Madan Jampani94c23532015-02-05 17:40:01 -080041/**
42 * A database that partitions the keys across one or more database partitions.
43 */
44public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
45
46 private Partitioner<String> partitioner;
47 private final ClusterCoordinator coordinator;
48 private final Map<String, Database> partitions = Maps.newConcurrentMap();
Madan Jampani7f72c3f2015-03-01 17:34:59 -080049 private final AtomicBoolean isOpen = new AtomicBoolean(false);
50 private static final String DB_NOT_OPEN = "Database is not open";
Madan Jampani94c23532015-02-05 17:40:01 -080051
52 protected PartitionedDatabase(ClusterCoordinator coordinator) {
53 this.coordinator = coordinator;
54 }
55
Madan Jampani7f72c3f2015-03-01 17:34:59 -080056 /**
57 * Returns true if the database is open.
58 * @return true if open, false otherwise
59 */
60 public boolean isOpen() {
61 return isOpen.get();
62 }
63
Madan Jampani94c23532015-02-05 17:40:01 -080064 @Override
65 public void registerPartition(String name, Database partition) {
66 partitions.put(name, partition);
67 }
68
69 @Override
70 public Map<String, Database> getRegisteredPartitions() {
71 return ImmutableMap.copyOf(partitions);
72 }
73
74 @Override
75 public CompletableFuture<Integer> size(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -080076 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -080077 AtomicInteger totalSize = new AtomicInteger(0);
78 return CompletableFuture.allOf(partitions
79 .values()
80 .stream()
81 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
82 .toArray(CompletableFuture[]::new))
83 .thenApply(v -> totalSize.get());
84 }
85
86 @Override
87 public CompletableFuture<Boolean> isEmpty(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -080088 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -080089 return size(tableName).thenApply(size -> size == 0);
90 }
91
92 @Override
93 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -080094 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -080095 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
96 }
97
98 @Override
99 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800100 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800101 AtomicBoolean containsValue = new AtomicBoolean(false);
102 return CompletableFuture.allOf(partitions
103 .values()
104 .stream()
105 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
106 .toArray(CompletableFuture[]::new))
107 .thenApply(v -> containsValue.get());
108 }
109
110 @Override
111 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800112 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800113 return partitioner.getPartition(tableName, key).get(tableName, key);
114 }
115
116 @Override
117 public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800118 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800119 return partitioner.getPartition(tableName, key).put(tableName, key, value);
120 }
121
122 @Override
123 public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800124 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800125 return partitioner.getPartition(tableName, key).remove(tableName, key);
126 }
127
128 @Override
129 public CompletableFuture<Void> clear(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800130 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800131 return CompletableFuture.allOf(partitions
132 .values()
133 .stream()
134 .map(p -> p.clear(tableName))
135 .toArray(CompletableFuture[]::new));
136 }
137
138 @Override
139 public CompletableFuture<Set<String>> keySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800140 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800141 Set<String> keySet = Sets.newConcurrentHashSet();
142 return CompletableFuture.allOf(partitions
143 .values()
144 .stream()
145 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
146 .toArray(CompletableFuture[]::new))
147 .thenApply(v -> keySet);
148 }
149
150 @Override
151 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800152 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800153 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
154 return CompletableFuture.allOf(partitions
155 .values()
156 .stream()
Madan Jampani393e0f02015-02-12 07:35:39 +0530157 .map(p -> p.values(tableName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800158 .toArray(CompletableFuture[]::new))
159 .thenApply(v -> values);
160 }
161
162 @Override
163 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800164 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800165 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
166 return CompletableFuture.allOf(partitions
167 .values()
168 .stream()
169 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
170 .toArray(CompletableFuture[]::new))
171 .thenApply(v -> entrySet);
172 }
173
174 @Override
175 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800176 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800177 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
178 }
179
180 @Override
181 public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800182 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800183 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
184 }
185
186 @Override
187 public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800188 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800189 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
190 }
191
192 @Override
193 public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800194 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800195 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
196 }
197
198 @Override
199 public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800200 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800201 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
202 }
203
204 @Override
205 public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800206 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800207 Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
208 for (UpdateOperation<String, byte[]> update : updates) {
209 Database partition = partitioner.getPartition(update.tableName(), update.key());
210 List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
211 if (partitionUpdates == null) {
212 partitionUpdates = Lists.newArrayList();
213 perPartitionUpdates.put(partition, partitionUpdates);
214 }
215 partitionUpdates.add(update);
216 }
217 if (perPartitionUpdates.size() > 1) {
218 // TODO
219 throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
220 } else {
221 Entry<Database, List<UpdateOperation<String, byte[]>>> only =
222 perPartitionUpdates.entrySet().iterator().next();
223 return only.getKey().atomicBatchUpdate(only.getValue());
224 }
225 }
226
227 @Override
228 public void setPartitioner(Partitioner<String> partitioner) {
229 this.partitioner = partitioner;
230 }
231
232 @Override
233 public CompletableFuture<PartitionedDatabase> open() {
234 return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
235 .values()
236 .stream()
237 .map(Database::open)
Madan Jampani09342702015-02-05 23:32:40 -0800238 .toArray(CompletableFuture[]::new))
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800239 .thenApply(v -> {
240 isOpen.set(true);
241 return this; }));
Madan Jampani94c23532015-02-05 17:40:01 -0800242
243 }
244
245 @Override
246 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800247 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800248 CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
249 .values()
250 .stream()
251 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800252 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800253 CompletableFuture<Void> closeCoordinator = coordinator.close();
254 return closePartitions.thenCompose(v -> closeCoordinator);
255 }
256}