blob: 72f8c041d83d12a01564eda739efcd40cfa34255 [file] [log] [blame]
Madan Jampani94c23532015-02-05 17:40:01 -08001package org.onosproject.store.consistent.impl;
2
3import java.util.Collection;
4import java.util.List;
5import java.util.Map;
6import java.util.Map.Entry;
7import java.util.Set;
8import java.util.concurrent.CompletableFuture;
9import java.util.concurrent.CopyOnWriteArrayList;
10import java.util.concurrent.atomic.AtomicBoolean;
11import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani393e0f02015-02-12 07:35:39 +053012
13import org.onosproject.store.service.UpdateOperation;
14import org.onosproject.store.service.Versioned;
15
Madan Jampani94c23532015-02-05 17:40:01 -080016import com.google.common.collect.ImmutableMap;
17import com.google.common.collect.Lists;
18import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
20
21import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
22
23/**
24 * A database that partitions the keys across one or more database partitions.
25 */
26public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
27
28 private Partitioner<String> partitioner;
29 private final ClusterCoordinator coordinator;
30 private final Map<String, Database> partitions = Maps.newConcurrentMap();
31
32 protected PartitionedDatabase(ClusterCoordinator coordinator) {
33 this.coordinator = coordinator;
34 }
35
36 @Override
37 public void registerPartition(String name, Database partition) {
38 partitions.put(name, partition);
39 }
40
41 @Override
42 public Map<String, Database> getRegisteredPartitions() {
43 return ImmutableMap.copyOf(partitions);
44 }
45
46 @Override
47 public CompletableFuture<Integer> size(String tableName) {
48 AtomicInteger totalSize = new AtomicInteger(0);
49 return CompletableFuture.allOf(partitions
50 .values()
51 .stream()
52 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
53 .toArray(CompletableFuture[]::new))
54 .thenApply(v -> totalSize.get());
55 }
56
57 @Override
58 public CompletableFuture<Boolean> isEmpty(String tableName) {
59 return size(tableName).thenApply(size -> size == 0);
60 }
61
62 @Override
63 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
64 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
65 }
66
67 @Override
68 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
69 AtomicBoolean containsValue = new AtomicBoolean(false);
70 return CompletableFuture.allOf(partitions
71 .values()
72 .stream()
73 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
74 .toArray(CompletableFuture[]::new))
75 .thenApply(v -> containsValue.get());
76 }
77
78 @Override
79 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
80 return partitioner.getPartition(tableName, key).get(tableName, key);
81 }
82
83 @Override
84 public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
85 return partitioner.getPartition(tableName, key).put(tableName, key, value);
86 }
87
88 @Override
89 public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
90 return partitioner.getPartition(tableName, key).remove(tableName, key);
91 }
92
93 @Override
94 public CompletableFuture<Void> clear(String tableName) {
95 return CompletableFuture.allOf(partitions
96 .values()
97 .stream()
98 .map(p -> p.clear(tableName))
99 .toArray(CompletableFuture[]::new));
100 }
101
102 @Override
103 public CompletableFuture<Set<String>> keySet(String tableName) {
104 Set<String> keySet = Sets.newConcurrentHashSet();
105 return CompletableFuture.allOf(partitions
106 .values()
107 .stream()
108 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
109 .toArray(CompletableFuture[]::new))
110 .thenApply(v -> keySet);
111 }
112
113 @Override
114 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
115 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
116 return CompletableFuture.allOf(partitions
117 .values()
118 .stream()
Madan Jampani393e0f02015-02-12 07:35:39 +0530119 .map(p -> p.values(tableName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800120 .toArray(CompletableFuture[]::new))
121 .thenApply(v -> values);
122 }
123
124 @Override
125 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
126 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
127 return CompletableFuture.allOf(partitions
128 .values()
129 .stream()
130 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
131 .toArray(CompletableFuture[]::new))
132 .thenApply(v -> entrySet);
133 }
134
135 @Override
136 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
137 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
138 }
139
140 @Override
141 public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
142 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
143 }
144
145 @Override
146 public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
147 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
148 }
149
150 @Override
151 public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
152 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
153 }
154
155 @Override
156 public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
157 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
158 }
159
160 @Override
161 public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
162 Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
163 for (UpdateOperation<String, byte[]> update : updates) {
164 Database partition = partitioner.getPartition(update.tableName(), update.key());
165 List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
166 if (partitionUpdates == null) {
167 partitionUpdates = Lists.newArrayList();
168 perPartitionUpdates.put(partition, partitionUpdates);
169 }
170 partitionUpdates.add(update);
171 }
172 if (perPartitionUpdates.size() > 1) {
173 // TODO
174 throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
175 } else {
176 Entry<Database, List<UpdateOperation<String, byte[]>>> only =
177 perPartitionUpdates.entrySet().iterator().next();
178 return only.getKey().atomicBatchUpdate(only.getValue());
179 }
180 }
181
182 @Override
183 public void setPartitioner(Partitioner<String> partitioner) {
184 this.partitioner = partitioner;
185 }
186
187 @Override
188 public CompletableFuture<PartitionedDatabase> open() {
189 return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
190 .values()
191 .stream()
192 .map(Database::open)
Madan Jampani09342702015-02-05 23:32:40 -0800193 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800194 .thenApply(v -> this));
195
196 }
197
198 @Override
199 public CompletableFuture<Void> close() {
200 CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
201 .values()
202 .stream()
203 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800204 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800205 CompletableFuture<Void> closeCoordinator = coordinator.close();
206 return closePartitions.thenCompose(v -> closeCoordinator);
207 }
208}