blob: d35aca2bc5816007993d546aec5817161ee5527b [file] [log] [blame]
Madan Jampani94c23532015-02-05 17:40:01 -08001package org.onosproject.store.consistent.impl;
2
3import java.util.Collection;
4import java.util.List;
5import java.util.Map;
6import java.util.Map.Entry;
7import java.util.Set;
8import java.util.concurrent.CompletableFuture;
9import java.util.concurrent.CopyOnWriteArrayList;
10import java.util.concurrent.atomic.AtomicBoolean;
11import java.util.concurrent.atomic.AtomicInteger;
12import java.util.stream.Collectors;
13
14import com.google.common.collect.ImmutableMap;
15import com.google.common.collect.Lists;
16import com.google.common.collect.Maps;
17import com.google.common.collect.Sets;
18
19import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
20
21/**
22 * A database that partitions the keys across one or more database partitions.
23 */
24public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
25
26 private Partitioner<String> partitioner;
27 private final ClusterCoordinator coordinator;
28 private final Map<String, Database> partitions = Maps.newConcurrentMap();
29
30 protected PartitionedDatabase(ClusterCoordinator coordinator) {
31 this.coordinator = coordinator;
32 }
33
34 @Override
35 public void registerPartition(String name, Database partition) {
36 partitions.put(name, partition);
37 }
38
39 @Override
40 public Map<String, Database> getRegisteredPartitions() {
41 return ImmutableMap.copyOf(partitions);
42 }
43
44 @Override
45 public CompletableFuture<Integer> size(String tableName) {
46 AtomicInteger totalSize = new AtomicInteger(0);
47 return CompletableFuture.allOf(partitions
48 .values()
49 .stream()
50 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
51 .toArray(CompletableFuture[]::new))
52 .thenApply(v -> totalSize.get());
53 }
54
55 @Override
56 public CompletableFuture<Boolean> isEmpty(String tableName) {
57 return size(tableName).thenApply(size -> size == 0);
58 }
59
60 @Override
61 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
62 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
63 }
64
65 @Override
66 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
67 AtomicBoolean containsValue = new AtomicBoolean(false);
68 return CompletableFuture.allOf(partitions
69 .values()
70 .stream()
71 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
72 .toArray(CompletableFuture[]::new))
73 .thenApply(v -> containsValue.get());
74 }
75
76 @Override
77 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
78 return partitioner.getPartition(tableName, key).get(tableName, key);
79 }
80
81 @Override
82 public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
83 return partitioner.getPartition(tableName, key).put(tableName, key, value);
84 }
85
86 @Override
87 public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
88 return partitioner.getPartition(tableName, key).remove(tableName, key);
89 }
90
91 @Override
92 public CompletableFuture<Void> clear(String tableName) {
93 return CompletableFuture.allOf(partitions
94 .values()
95 .stream()
96 .map(p -> p.clear(tableName))
97 .toArray(CompletableFuture[]::new));
98 }
99
100 @Override
101 public CompletableFuture<Set<String>> keySet(String tableName) {
102 Set<String> keySet = Sets.newConcurrentHashSet();
103 return CompletableFuture.allOf(partitions
104 .values()
105 .stream()
106 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
107 .toArray(CompletableFuture[]::new))
108 .thenApply(v -> keySet);
109 }
110
111 @Override
112 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
113 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
114 return CompletableFuture.allOf(partitions
115 .values()
116 .stream()
117 .map(p -> p.values(tableName))
118 .toArray(CompletableFuture[]::new))
119 .thenApply(v -> values);
120 }
121
122 @Override
123 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
124 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
125 return CompletableFuture.allOf(partitions
126 .values()
127 .stream()
128 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
129 .toArray(CompletableFuture[]::new))
130 .thenApply(v -> entrySet);
131 }
132
133 @Override
134 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
135 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
136 }
137
138 @Override
139 public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
140 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
141 }
142
143 @Override
144 public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
145 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
146 }
147
148 @Override
149 public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
150 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
151 }
152
153 @Override
154 public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
155 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
156 }
157
158 @Override
159 public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
160 Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
161 for (UpdateOperation<String, byte[]> update : updates) {
162 Database partition = partitioner.getPartition(update.tableName(), update.key());
163 List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
164 if (partitionUpdates == null) {
165 partitionUpdates = Lists.newArrayList();
166 perPartitionUpdates.put(partition, partitionUpdates);
167 }
168 partitionUpdates.add(update);
169 }
170 if (perPartitionUpdates.size() > 1) {
171 // TODO
172 throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
173 } else {
174 Entry<Database, List<UpdateOperation<String, byte[]>>> only =
175 perPartitionUpdates.entrySet().iterator().next();
176 return only.getKey().atomicBatchUpdate(only.getValue());
177 }
178 }
179
180 @Override
181 public void setPartitioner(Partitioner<String> partitioner) {
182 this.partitioner = partitioner;
183 }
184
185 @Override
186 public CompletableFuture<PartitionedDatabase> open() {
187 return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
188 .values()
189 .stream()
190 .map(Database::open)
191 .collect(Collectors.toList())
192 .toArray(new CompletableFuture[partitions.size()]))
193 .thenApply(v -> this));
194
195 }
196
197 @Override
198 public CompletableFuture<Void> close() {
199 CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
200 .values()
201 .stream()
202 .map(database -> database.close())
203 .collect(Collectors.toList())
204 .toArray(new CompletableFuture[partitions.size()]));
205 CompletableFuture<Void> closeCoordinator = coordinator.close();
206 return closePartitions.thenCompose(v -> closeCoordinator);
207 }
208}